Re: Data (re)processing with Kafka (new wiki page)

2016-11-25 Thread saiprasad mishra
This page is really helpful.Thanks for putting this

Some nice to have features can be (not sure for this wiki page)

1) Pause and resume without having to start and stop.
It should drain all the inflight calculations before doing the actual pause
and a notifier will be helpful that it is actually paused.
This can be much light wait if possible rather than stopping all tasks and
stores and starting them again which might take a lot of time


2) Metadata api response if it can have all the topology graphs and sub
graphs with all the nodes and edges for each sub graphs with corresponding
state store names then it will be easier to build some UI on top of it and
also the pipe between the sub graphs which are kafka topics need to be
called out and also the time semantics can be laid out on top of it. This
is something like a logical view on top of the physical view which the
current api has.


Regards
Sai

On Fri, Nov 25, 2016 at 12:53 AM, Michael Noll  wrote:

> Thanks a lot, Matthias!
>
> I have already begun to provide feedback.
>
> -Michael
>
>
>
> On Wed, Nov 23, 2016 at 11:41 PM, Matthias J. Sax 
> wrote:
>
> > Hi,
> >
> > we added a new wiki page that is supposed to collect data (re)processing
> > scenario with Kafka:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams+Data+(Re)
> > Processing+Scenarios
> >
> > We added already a couple of scenarios we think might be common and want
> > to invite all of you to add more. This helps to get a better overview of
> > requirements to enable new use cases.
> >
> > We are looking forward to your feedback!
> >
> >
> > -Matthias
> >
> >
>


Re: Kafka streaming changelog topic max.message.bytes exception

2016-11-10 Thread saiprasad mishra
Hi Sachin

Here is a possible general approach which might work for you in abscence of
any broadcast variable and everything being local state and also if you can
adjust you process to do everything before doing aggregation.

Basically the idea here is to use a custom topology with custom processor
with a persistent state store  and use it to change the data and send it to
a result topic so that the aggregation becomes easier .You can change the
incoming data based on the old data in this state store for a given key or
process it however way you want to and then forward to a final topic.

Below is the link for a sample custom processor.

https://apache.googlesource.com/kafka/+/0.10.0/streams/
examples/src/main/java/org/apache/kafka/streams/examples/wordcount/
WordCountProcessorDemo.java?autodive=0%2F%2F. Note that in this class it
accesses the store on init whose name should be "oldvalues" as per below
pseudo code.

It could be done something like below (if you use the TopologyBuilder api)

Some pseudo code which you might have already seen in the docs

builder.addProcessor("myprocessor",WordCountProcessorDemo::new,
"sourcetopic");

StateStoreSupplier oldValueStore = Stores.create("oldvalues").withKeys(
stringSerde).withValues(stringSerde) .persistent().build();

builder.addStateStore(oldValueStore,"myprocessor");

builder.addSink(toAggregationTopic)

Now create the final KTable out of this toAggregationTopic which has the
cleaned data.


Hope this helps and ignore this if I have misunderstood your usecase

Regards

Sai








On Thu, Nov 10, 2016 at 8:40 PM, Sachin Mittal  wrote:

> Hi,
> On the subject of deleting values from list I have following toplogy
> aggregate(old list) = new list -> left join (another list) -> output sink.
>
> While processing the output list I know which values in the list are old
> and can be removed.
> Is there any way to pass that information from downstream back to upstream?
> Any thoughts around how can I pass this information.
>
> One thing I can think off is that I can set some global variable in the
> output sink.
>
> So next time aggregate function is run it can lookup the global variable
> and remove items from the list.
> So new list = old list + new value added - old values removed.
>
> In spark we have something like broadcast variables to do the same.
>
> Is there any such similar concept in kafka streaming.
>
> This way we can keep the changelog topic message from growing and prevent
> the max message bytes exception.
>
> Thanks
> Sachin
>
>
>
>
>
> On Fri, Nov 11, 2016 at 1:02 AM, Matthias J. Sax 
> wrote:
>
> > -BEGIN PGP SIGNED MESSAGE-
> > Hash: SHA512
> >
> > Sachin,
> >
> > my commend about deleting was about deleting values from the list, not
> > about deleting the whole key/value record.
> >
> > If you want to delete a whole key/value record it there is not update
> > for it for some time, you can combine compaction with retention. You
> > need to alter the configuration of the changelog topics and set
> >
> > cleanup.policy=compact,delete
> >
> > Than, retention.ms will be applied to the changelog, too.
> >
> >
> > - -Matthias
> >
> > On 11/10/16 3:17 AM, Sachin Mittal wrote:
> > > Hi, As per Eno suggestion I have pre-created internal changelog
> > > topics with increased max.message.bytes config to handle big
> > > messages that gets incremented over the time.
> > >
> > > As Matthias has pointed that we cannot use retention.ms setting to
> > > delete older message data after a given time, is there a way to
> > > purge older messages from my changelog topic.
> > >
> > > Remember my changelog topic is key=list of objects and this grows
> > > with time.
> > >
> > > So I would like these to be deleted from time to time because I
> > > would have already consumed the objects so that key/value can be
> > > deleted. Later if I get a new object for the same key then that's a
> > > new message and old data has no use for the streaming application.
> > >
> > > So how can I achieve the following. Would retention.bytes help
> > > here?
> > >
> > > Is there a way if i can set expire after or something like that at
> > > message level and some kafka thread would purge those messages.
> > >
> > > Thanks Sachin
> > >
> > >
> > >
> > > On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax
> > >  wrote:
> > >
> > > My two cents:
> > >
> > > Changelog topics are compacted topics, thus they do not have a
> > > retention time (there is an exception for windowed KTable changlog
> > > topics that are compacted and do have a retention time though).
> > >
> > > However, I do not understand how changing retention time should
> > > fix the issue. If your list of values grows and exceed
> > > max.message.byte you will need to increase this parameter (or
> > > shrink you value).
> > >
> > > Besides this, Eno's answer is the way to go. In order to figure
> > > out internal topic names, you can use 

Re: Reopen KAFKA-4344 ?

2016-11-07 Thread saiprasad mishra
Hi Srinivas

I raised the issue and the way I got around this was to let kafka streams
run on POJO way rather than some of the dependent instances being spring
managed bean instances.
If you create the instance of riakService and counterService in processor
class instead of passing the spring managed instances to the processor
class constructor your kafka streams initilization should be fine and it
should create the right number of tasks with right number of processors for
all the partitions.

I was fine with POJO based approach as kafka streams has quite a bit of
apis to query the state(of course once it is started correctly) as i am
running stateful processors and i wanted to query the state data all the
time. I was just using spring boot controller for the web container to
proxy the kafka streams state store(ReadOnlyKeyValueStore) get apis.

Alternatively you can try having prototype components for these two
services (if your usecase is fine with this).

Hope this helps.

Regards
Sai

On Mon, Nov 7, 2016 at 9:08 AM, Matthias J. Sax 
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> KAFKA-4344 was not a bug. The issues was as wrong initialization order
> of Kafka Streams by the user.
>
> Please double check your initialization order (and maybe read the old
> email thread and JIRA comments -- it might have some relevant
> information for you to fix the issue for you).
>
> If the problem is still there, can you please reduce your code to a
> minimum example that reproduces the problem?
>
> Thanks!
>
> - -Matthias
>
> On 11/5/16 3:28 PM, srinivas koniki wrote:
> >
> > Hi, I'm still seeing the same issue with spring boot. Code is
> > below, sorry code is in groovy and not fully baked. Just have
> > single processor. It worked well with single partition. But when i
> > increased the partitions, started seeing the error as in this
> > kafka-4344.
> >
> >
> > import com.codahale.metrics.MetricRegistry import
> > org.apache.kafka.clients.consumer.ConsumerConfig import
> > org.apache.kafka.clients.producer.KafkaProducer import
> > org.apache.kafka.clients.producer.ProducerRecord import
> > org.apache.kafka.common.serialization.Serdes import
> > org.apache.kafka.streams.KafkaStreams import
> > org.apache.kafka.streams.StreamsConfig import
> > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster
> > import org.apache.kafka.streams.processor.AbstractProcessor import
> > org.apache.kafka.streams.processor.ProcessorSupplier import
> > org.apache.kafka.streams.processor.TopologyBuilder import
> > org.aspectj.lang.ProceedingJoinPoint import
> > org.aspectj.lang.annotation.AfterReturning import
> > org.aspectj.lang.annotation.Around import
> > org.aspectj.lang.annotation.Aspect import
> > org.aspectj.lang.annotation.Pointcut import
> > org.springframework.beans.factory.annotation.Autowired import
> > org.springframework.beans.factory.annotation.Value import
> > org.springframework.boot.actuate.metrics.CounterService import
> > org.springframework.boot.actuate.metrics.GaugeService import
> > org.springframework.boot.autoconfigure.SpringBootApplication import
> > org.springframework.boot.test.context.SpringBootTest import
> > org.springframework.context.Lifecycle import
> > org.springframework.context.annotation.Bean import
> > org.springframework.context.annotation.Configuration import
> > org.springframework.context.annotation.Import import
> > org.springframework.context.support.PropertySourcesPlaceholderConfigur
> er
> >
> >
> import org.springframework.stereotype.Component
> > import org.springframework.test.context.ContextConfiguration import
> > org.springframework.util.StopWatch import spock.lang.Shared import
> > spock.lang.Specification
> >
> > import java.util.concurrent.Future import
> > java.util.stream.IntStream
> >
> > /** * Created by srinivas.koniki on 11/5/16. */
> > @ContextConfiguration(classes=[TestConfig, MetricsAspect,
> > RiakService]) @SpringBootTest(webEnvironment =
> > SpringBootTest.WebEnvironment.RANDOM_PORT) class MetricsSpec
> > extends Specification{
> >
> > static String kafkaTopic = 'testTopic'
> >
> > @Shared TestConfig testConfigRef
> >
> > @Autowired TestConfig testConfig
> >
> > @Autowired MetricRegistry metricRegistry
> >
> > @Autowired KafkaProducer kafkaProducer
> >
> > @Shared static final EmbeddedKafkaCluster CLUSTER = new
> > EmbeddedKafkaCluster(1)
> >
> > def setupSpec() { println("Heavy init for all the tests...")
> > CLUSTER.start()
> > System.setProperty('broker.url',CLUSTER.bootstrapServers())
> > System.setProperty('zk.url',CLUSTER.zKConnectString())
> > System.setProperty('kafka.topic',kafkaTopic)
> > CLUSTER.createTopic(kafkaTopic, 3, 1) }
> >
> > def cleanupSpec() { testConfigRef.stop() CLUSTER.stop() }
> >
> > def "Test send and receive" (){ expect: testConfig != null
> > metricRegistry != null println ''+metricRegistry.getGauges()
> >
> > when: testConfigRef = testConfig testConfig.start() List
> > futureList = new 

Re: Kafka Streams fails permanently when used with an unstable network

2016-11-04 Thread saiprasad mishra
Hi Eno
Thanks for the JIRA info
The change looks worth trying.Will let you know after i try it out.

Regards
Sai


On Wed, Nov 2, 2016 at 1:33 PM, Eno Thereska <eno.there...@gmail.com> wrote:

> Hi Sai,
>
> For your second note on rebalancing taking a long time, we have just
> improved the situation in trunk after fixing this JIRA:
> https://issues.apache.org/jira/browse/KAFKA-3559 <
> https://issues.apache.org/jira/browse/KAFKA-3559>. Feel free to give it a
> go if rebalancing time continues to be a problem.
>
> Thanks
> Eno
>
> > On 31 Oct 2016, at 19:44, saiprasad mishra <saiprasadmis...@gmail.com>
> wrote:
> >
> > Hey Guys
> >
> > I have noticed similar issues when network goes down on starting of kafka
> > stream apps especially the store has initialized but the task
> > initialization is not complete and when the network comes back the
> > rebalance fails with the above error and I had to restart. as i run many
> > partitions and have many tasks get initialized.
> >
> > Otherwise if the kafka streams app is started successfully does recover
> > from network issues always as far as what I have seen so far and also
> > stores do remain available.
> >
> > Which means some of these initialization exceptions can be categorized as
> > recoverable and should be always retried.
> >
> > I think task 0_0 in your case was not initialized properly in the first
> > place and then rebalance happened bcoz of network connectivity and it
> > resulted in the above exception.
> >
> > On a separate note rebalance takes longer time  as i have some
> > intermeidiary topics and thinking it might be worse if network is slow
> and
> > was thinking of something like store may be available for querying
> quickly
> > without waiting for the full initialization of tasks
> >
> > Regards
> > Sai
> >
> >
> >
> >
> >
> >
> > Regards
> > Sai
> >
> > On Mon, Oct 31, 2016 at 3:51 AM, Damian Guy <damian@gmail.com>
> wrote:
> >
> >> Hi Frank,
> >>
> >> This usually means that another StreamThread has the lock for the state
> >> directory. So it would seem that one of the StreamThreads hasn't shut
> down
> >> cleanly. If it happens again can you please take a Thread Dump so we can
> >> see what is happening?
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Sun, 30 Oct 2016 at 10:52 Frank Lyaruu <flya...@gmail.com> wrote:
> >>
> >>> I have a remote Kafka cluster, to which I connect using a VPN and a
> >>> not-so-great WiFi network.
> >>> That means that sometimes the Kafka Client loses briefly loses
> >>> connectivity.
> >>> When it regains a connection after a while, I see:
> >>>
> >>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
> >> be
> >>> completed since the group has already rebalanced and assigned the
> >>> partitions to another member. This means that the time between
> subsequent
> >>> calls to poll() was longer than the configured max.poll.interval.ms,
> >> which
> >>> typically implies that the poll loop is spending too much time message
> >>> processing. You can address this either by increasing the session
> timeout
> >>> or by reducing the maximum size of batches returned in poll() with
> >>> max.poll.records.
> >>>
> >>> ...
> >>>
> >>> Which makes sense I suppose, but this shouldn't be fatal.
> >>>
> >>> But then I see:
> >>> [StreamThread-1] ERROR
> >>> org.apache.kafka.streams.processor.internals.StreamThread -
> >> stream-thread
> >>> [StreamThread-1] Failed to create an active task %s:
> >>> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0]
> >> Error
> >>> while creating the state manager
> >>>
> >>> at
> >>>
> >>> org.apache.kafka.streams.processor.internals.AbstractTask.(
> >> AbstractTask.java:72)
> >>> at
> >>>
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamTask.(StreamTask.java:89)
> >>> at
> >>>
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamThread.createStreamTask(StreamThread.java:633)
> >>> at
> >>>
> >>> org.apache.kafka.streams.processor.internals.
> &g

Re: Kafka Streams fails permanently when used with an unstable network

2016-10-31 Thread saiprasad mishra
Hey Guys

I have noticed similar issues when network goes down on starting of kafka
stream apps especially the store has initialized but the task
initialization is not complete and when the network comes back the
rebalance fails with the above error and I had to restart. as i run many
partitions and have many tasks get initialized.

Otherwise if the kafka streams app is started successfully does recover
from network issues always as far as what I have seen so far and also
stores do remain available.

Which means some of these initialization exceptions can be categorized as
recoverable and should be always retried.

I think task 0_0 in your case was not initialized properly in the first
place and then rebalance happened bcoz of network connectivity and it
resulted in the above exception.

On a separate note rebalance takes longer time  as i have some
intermeidiary topics and thinking it might be worse if network is slow and
was thinking of something like store may be available for querying quickly
without waiting for the full initialization of tasks

Regards
Sai






Regards
Sai

On Mon, Oct 31, 2016 at 3:51 AM, Damian Guy  wrote:

> Hi Frank,
>
> This usually means that another StreamThread has the lock for the state
> directory. So it would seem that one of the StreamThreads hasn't shut down
> cleanly. If it happens again can you please take a Thread Dump so we can
> see what is happening?
>
> Thanks,
> Damian
>
> On Sun, 30 Oct 2016 at 10:52 Frank Lyaruu  wrote:
>
> > I have a remote Kafka cluster, to which I connect using a VPN and a
> > not-so-great WiFi network.
> > That means that sometimes the Kafka Client loses briefly loses
> > connectivity.
> > When it regains a connection after a while, I see:
> >
> > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
> be
> > completed since the group has already rebalanced and assigned the
> > partitions to another member. This means that the time between subsequent
> > calls to poll() was longer than the configured max.poll.interval.ms,
> which
> > typically implies that the poll loop is spending too much time message
> > processing. You can address this either by increasing the session timeout
> > or by reducing the maximum size of batches returned in poll() with
> > max.poll.records.
> >
> > ...
> >
> > Which makes sense I suppose, but this shouldn't be fatal.
> >
> > But then I see:
> > [StreamThread-1] ERROR
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [StreamThread-1] Failed to create an active task %s:
> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_0]
> Error
> > while creating the state manager
> >
> > at
> >
> > org.apache.kafka.streams.processor.internals.AbstractTask.(
> AbstractTask.java:72)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.(StreamTask.java:89)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:633)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:660)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.access$100(
> StreamThread.java:69)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:124)
> > at
> >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:228)
> > at
> >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:313)
> > at
> >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:277)
> > at
> >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:259)
> > at
> >
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1013)
> > at
> >
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:979)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:407)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> >
> > Caused by: java.io.IOException: task [0_0] Failed to lock the state
> > directory:
> >
> > /Users/frank/git/dexels.repository/com.dexels.kafka.
> streams/kafka-streams/develop3-person/0_0
> >
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.(ProcessorStateManager.java:101)
> > at
> >
> > org.apache.kafka.streams.processor.internals.AbstractTask.(
> AbstractTask.java:69)
> >
> > ... 13 more
> >
> > And my stream applications is dead.
> >
> > So I'm guessing that either the store wasn't closed properly or some
> things
> > happen out of order.
> >
> > Any ideas?
> >
> > I'm using the trunk of Kafka 0.10.2.0-SNAPSHOT, Java 1.8.0_66 on MacOS
> > 10.11.6
> >
> > 

Re: issue with custom processor flush to rocksdb store

2016-10-26 Thread saiprasad mishra
Yes this is similar meaning it was all about KafkaStreams not started
correctly in my spring app and NOT a bug in KafkaStreams.
Inside the comments in the JIRA I have mentioned what I was doing wrong.

These type of exceptions largely indicate kafka streams was not started
correctly

Thanks for your valuable time on this
Regards
Sai

On Wed, Oct 26, 2016 at 2:34 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Is it a similar report as https://issues.apache.org/jira/browse/KAFKA-4344
> ?
>
> On Tue, Oct 25, 2016 at 2:43 PM, saiprasad mishra <
> saiprasadmis...@gmail.com
> > wrote:
>
> > Hi
> > This is with version 10.1.0 kafka streams (server running in remote and
> > streams app running local in my laptop).
> >
> >
> >
> > I have a kafka stream pipeline like this
> >
> > source topic(with 10 partitions) stream -> filter for null value ->map to
> > make it keyed by id ->custom processor to mystore(persistent)
> >
> > I am getting the below exception. This happens when the flush happens.
> > If I restart the app the data i sent is actually present in rocksdb
> store.
> > I see the message of the keyed stream went to partition 0 on which flush
> > happened correctly i guess as I see below partition 9 task failed to
> flush
> > not sure about the complain about timestamp() here.
> >
> > Can somebody explain what does this mean.
> >
> >
> > Not sure if it has something to do with below timestamp extractor
> property
> > i am setting or any other time like producer create time ???
> >
> > props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > ConsumerRecordTimestampExtractor.class);
> >
> >
> > Regards
> > Sai
> >
> >
> > 2016-10-25 14:31:29.822000
> > org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1
> > ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9
> state:
> >
> >
> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_9]
> Failed
> > to flush state store Products
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(
> > ProcessorStateManager.java:331)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.
> > java:275)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> > StreamThread.java:576)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> > StreamThread.java:562)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> > StreamThread.java:538)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:456)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:242)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > Caused by: java.lang.IllegalStateException: This should not happen as
> > timestamp() should only be called while a record is processed
> >
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.
> > timestamp(ProcessorContextImpl.java:192)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> > StoreChangeLogger.java:112)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> > flush(RocksDBStore.java:375)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(
> > MeteredKeyValueStore.java:175)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(
> > ProcessorStateManager.java:329)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > ... 6 more
> >
>
>
>
> --
> -- Guozhang
>


issue with custom processor flush to rocksdb store

2016-10-25 Thread saiprasad mishra
Hi
This is with version 10.1.0 kafka streams (server running in remote and
streams app running local in my laptop).



I have a kafka stream pipeline like this

source topic(with 10 partitions) stream -> filter for null value ->map to
make it keyed by id ->custom processor to mystore(persistent)

I am getting the below exception. This happens when the flush happens.
If I restart the app the data i sent is actually present in rocksdb store.
I see the message of the keyed stream went to partition 0 on which flush
happened correctly i guess as I see below partition 9 task failed to flush
not sure about the complain about timestamp() here.

Can somebody explain what does this mean.


Not sure if it has something to do with below timestamp extractor property
i am setting or any other time like producer create time ???

props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
ConsumerRecordTimestampExtractor.class);


Regards
Sai


2016-10-25 14:31:29.822000
org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1
ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9 state:


org.apache.kafka.streams.errors.ProcessorStateException: task [0_9] Failed
to flush state store Products

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:331)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:275)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:576)
[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:562)
[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:538)
[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:456)
[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
[kafka-streams-0.10.1.0.jar!/:?]

Caused by: java.lang.IllegalStateException: This should not happen as
timestamp() should only be called while a record is processed

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.timestamp(ProcessorContextImpl.java:192)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:112)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:375)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:175)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:329)
~[kafka-streams-0.10.1.0.jar!/:?]

... 6 more


Re: Exception when accessing partition, offset and timestamp in processor class

2016-10-25 Thread saiprasad mishra
Just created the JIRA

https://issues.apache.org/jira/browse/KAFKA-4344

Regards
Sai

On Tue, Oct 25, 2016 at 11:59 AM, saiprasad mishra <
saiprasadmis...@gmail.com> wrote:

> My JIRA id is saimishra
>
> Regards
> Sai
>
> On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> -BEGIN PGP SIGNED MESSAGE-
>> Hash: SHA512
>>
>> What is your JIRA ID? We can add you to the contributor list to give
>> you permission.
>>
>> - -Matthias
>>
>>
>> On 10/25/16 10:48 AM, saiprasad mishra wrote:
>> > Hi Matthias Thanks for the reply. I think I don't have permission
>> > for this. If you can grant me permission I can create one (my
>> > handle is saimishra). Or you can go ahead and create one
>> >
>> > I may need permission to create JIRA as I might report more issues
>> > after discussing with you over here.
>> >
>> > Regards Sai
>> >
>> > On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax
>> > <matth...@confluent.io> wrote:
>> >
>> > Hi,
>> >
>> > sorry for late reply. Seems like a bug to me; within
>> > Processor#process() accessing the context should work. Can you open
>> > a JIRA for it?
>> >
>> > -Matthias
>> >
>> > On 10/23/16 10:28 PM, saiprasad mishra wrote:
>> >>>> Sorry for the email again
>> >>>>
>> >>>> I was expecting it to work always when accessed from
>> >>>> process() method as this corresponds to each kafka
>> >>>> message/record processing. I understand illegalstate by the
>> >>>> time punctuate() is called as its already batched by time
>> >>>> interval
>> >>>>
>> >>>> Regards Sai
>> >>>>
>> >>>> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra
>> >>>> <saiprasadmis...@gmail.com
>> >>>>> wrote:
>> >>>>
>> >>>>> Hi
>> >>>>>
>> >>>>> his is with my streaming app kafka 10.1.0.
>> >>>>>
>> >>>>> My flow looks something like below
>> >>>>>
>> >>>>> source topic stream -> filter for null value ->map to make
>> >>>>> it keyed by id ->custom processor to mystore -> to another
>> >>>>> topic -> ktable
>> >>>>>
>> >>>>> I am hitting the below type of exception in a custom
>> >>>>> processor class if I try to access offset() or partition()
>> >>>>> or timestamp() from the ProcessorContext in the process()
>> >>>>> method. I was hoping it would return the partition and
>> >>>>> offset for the enclosing topic(in this case source topic)
>> >>>>> where its consuming from or -1 based on the api docs.
>> >>>>>
>> >>>>> Looks like only in certain cases it is accessible. is it
>> >>>>> getting lost in transformation phases.
>> >>>>>
>> >>>>> Same issue happens on if i try to access them in
>> >>>>> punctuate() method but some where I saw that it might not
>> >>>>> work in punctuate(). Any reason for this or any link
>> >>>>> describing this will be helpful
>> >>>>>
>> >>>>>
>> >>>>> ==
>> ==
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >
>> >>>>>
>> java.lang.IllegalStateException: This should not happen as offset()
>> >>>>> should only be called while a record is processed at
>> >>>>> org.apache.kafka.streams.processor.internals.
>> >>>>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
>> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at
>> >>>>> com.sai.repo.MyStore.process(MyStore.java:72)
>> >>>>> ~[classes!/:?] at
>> >>>>> com.sai.repo.MyStore.process(MyStore.java:39)
>> >>>>> ~[classes!/:?] at
>> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
>> (Pr
>> >
>> >>>>>
>> ocessorNode

Re: Exception when accessing partition, offset and timestamp in processor class

2016-10-25 Thread saiprasad mishra
My JIRA id is saimishra

Regards
Sai

On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> What is your JIRA ID? We can add you to the contributor list to give
> you permission.
>
> - -Matthias
>
>
> On 10/25/16 10:48 AM, saiprasad mishra wrote:
> > Hi Matthias Thanks for the reply. I think I don't have permission
> > for this. If you can grant me permission I can create one (my
> > handle is saimishra). Or you can go ahead and create one
> >
> > I may need permission to create JIRA as I might report more issues
> > after discussing with you over here.
> >
> > Regards Sai
> >
> > On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax
> > <matth...@confluent.io> wrote:
> >
> > Hi,
> >
> > sorry for late reply. Seems like a bug to me; within
> > Processor#process() accessing the context should work. Can you open
> > a JIRA for it?
> >
> > -Matthias
> >
> > On 10/23/16 10:28 PM, saiprasad mishra wrote:
> >>>> Sorry for the email again
> >>>>
> >>>> I was expecting it to work always when accessed from
> >>>> process() method as this corresponds to each kafka
> >>>> message/record processing. I understand illegalstate by the
> >>>> time punctuate() is called as its already batched by time
> >>>> interval
> >>>>
> >>>> Regards Sai
> >>>>
> >>>> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra
> >>>> <saiprasadmis...@gmail.com
> >>>>> wrote:
> >>>>
> >>>>> Hi
> >>>>>
> >>>>> his is with my streaming app kafka 10.1.0.
> >>>>>
> >>>>> My flow looks something like below
> >>>>>
> >>>>> source topic stream -> filter for null value ->map to make
> >>>>> it keyed by id ->custom processor to mystore -> to another
> >>>>> topic -> ktable
> >>>>>
> >>>>> I am hitting the below type of exception in a custom
> >>>>> processor class if I try to access offset() or partition()
> >>>>> or timestamp() from the ProcessorContext in the process()
> >>>>> method. I was hoping it would return the partition and
> >>>>> offset for the enclosing topic(in this case source topic)
> >>>>> where its consuming from or -1 based on the api docs.
> >>>>>
> >>>>> Looks like only in certain cases it is accessible. is it
> >>>>> getting lost in transformation phases.
> >>>>>
> >>>>> Same issue happens on if i try to access them in
> >>>>> punctuate() method but some where I saw that it might not
> >>>>> work in punctuate(). Any reason for this or any link
> >>>>> describing this will be helpful
> >>>>>
> >>>>>
> >>>>> ==
> ==
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >
> >>>>>
> java.lang.IllegalStateException: This should not happen as offset()
> >>>>> should only be called while a record is processed at
> >>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >>>>> com.sai.repo.MyStore.process(MyStore.java:72)
> >>>>> ~[classes!/:?] at
> >>>>> com.sai.repo.MyStore.process(MyStore.java:39)
> >>>>> ~[classes!/:?] at
> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process
> (Pr
> >
> >>>>>
> ocessorNode.java:82)
> >>>>>
> >>>>>
> >>>>>
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >>>>> at org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >>>>>
> >>>>>
> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >>>>> org.apache.kafka.streams.kstream.internals.KStreamMap$
> >>>>> KStreamMapProcessor.process(KStreamMap.java:43)
> >>>>> ~[kafka-streams-0.10.1

Re: Exception when accessing partition, offset and timestamp in processor class

2016-10-25 Thread saiprasad mishra
Hi Matthias
Thanks for the reply.
I think I don't have permission for this.
If you can grant me permission I can create one (my handle is saimishra).
Or you can go ahead and create one

I may need permission to create JIRA as I might report more issues after
discussing with you over here.

Regards
Sai

On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> Hi,
>
> sorry for late reply. Seems like a bug to me; within
> Processor#process() accessing the context should work. Can you open a
> JIRA for it?
>
> - -Matthias
>
> On 10/23/16 10:28 PM, saiprasad mishra wrote:
> > Sorry for the email again
> >
> > I was expecting it to work always when accessed from process()
> > method as this corresponds to each kafka message/record processing.
> > I understand illegalstate by the time punctuate() is called as its
> > already batched by time interval
> >
> > Regards Sai
> >
> > On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra
> > <saiprasadmis...@gmail.com
> >> wrote:
> >
> >> Hi
> >>
> >> his is with my streaming app kafka 10.1.0.
> >>
> >> My flow looks something like below
> >>
> >> source topic stream -> filter for null value ->map to make it
> >> keyed by id ->custom processor to mystore -> to another topic ->
> >> ktable
> >>
> >> I am hitting the below type of exception in a custom processor
> >> class if I try to access offset() or partition() or timestamp()
> >> from the ProcessorContext in the process() method. I was hoping
> >> it would return the partition and offset for the enclosing
> >> topic(in this case source topic) where its consuming from or -1
> >> based on the api docs.
> >>
> >> Looks like only in certain cases it is accessible. is it getting
> >> lost in transformation phases.
> >>
> >> Same issue happens on if i try to access them in punctuate()
> >> method but some where I saw that it might not work in
> >> punctuate(). Any reason for this or any link describing this
> >> will be helpful
> >>
> >>
> >> 
> >>
> >>
> >>
> >>
> java.lang.IllegalStateException: This should not happen as offset()
> >> should only be called while a record is processed at
> >> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] at
> >> com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
> ocessorNode.java:82)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.kstream.internals.KStreamMap$
> >> KStreamMapProcessor.process(KStreamMap.java:43)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
> ocessorNode.java:82)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.kstream.internals.KStreamFilter$
> >> KStreamFilterProcessor.process(KStreamFilter.java:44)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr
> ocessorNode.java:82)
> >>
> >>
> >>
> ~[kafka-streams-0.10.1.0.jar!/:?]
> >> at org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.
> >> SourceNode.process(SourceNode.java:66)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:181)
> >> ~[kafka-streams-0.10.1.0.jar!/:?] at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(Str
> eamThread.java:436)
> >>
> 

Re: Exception when accessing partition, offset and timestamp in processor class

2016-10-23 Thread saiprasad mishra
Sorry for the email again

I was expecting it to work always when accessed from process() method as
this corresponds to each kafka message/record processing.
I understand illegalstate by the time punctuate() is called as its already
batched by time interval

Regards
Sai

On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra <saiprasadmis...@gmail.com
> wrote:

> Hi
>
> his is with my streaming app kafka 10.1.0.
>
> My flow looks something like below
>
> source topic stream -> filter for null value ->map to make it keyed by id
> ->custom processor to mystore -> to another topic -> ktable
>
> I am hitting the below type of exception in a custom processor class if I
> try to access offset() or partition() or timestamp() from the
> ProcessorContext in the process() method. I was hoping it would return the
> partition and offset for the enclosing topic(in this case source topic)
> where its consuming from or -1 based on the api docs.
>
> Looks like only in certain cases it is accessible. is it getting lost in
> transformation phases.
>
> Same issue happens on if i try to access them in punctuate() method but
> some where I saw that it might not work in punctuate(). Any reason for this
> or any link describing this will be helpful
>
>
> 
>
> java.lang.IllegalStateException: This should not happen as offset()
> should only be called while a record is processed
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
> at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.kstream.internals.KStreamMap$
> KStreamMapProcessor.process(KStreamMap.java:43)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.kstream.internals.KStreamFilter$
> KStreamFilterProcessor.process(KStreamFilter.java:44)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:181) ~[kafka-streams-0.10.1.0.jar!/:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
> ~[kafka-streams-0.10.1.0.jar!/:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.0.jar!/:?]
> =
>
>
> Regards
> Sai
>


Exception when accessing partition, offset and timestamp in processor class

2016-10-23 Thread saiprasad mishra
Hi

his is with my streaming app kafka 10.1.0.

My flow looks something like below

source topic stream -> filter for null value ->map to make it keyed by id
->custom processor to mystore -> to another topic -> ktable

I am hitting the below type of exception in a custom processor class if I
try to access offset() or partition() or timestamp() from the
ProcessorContext in the process() method. I was hoping it would return the
partition and offset for the enclosing topic(in this case source topic)
where its consuming from or -1 based on the api docs.

Looks like only in certain cases it is accessible. is it getting lost in
transformation phases.

Same issue happens on if i try to access them in punctuate() method but
some where I saw that it might not work in punctuate(). Any reason for this
or any link describing this will be helpful




java.lang.IllegalStateException: This should not happen as offset() should
only be called while a record is processed
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181)
~[kafka-streams-0.10.1.0.jar!/:?]
at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
~[kafka-streams-0.10.1.0.jar!/:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
[kafka-streams-0.10.1.0.jar!/:?]
=


Regards
Sai


Re: kafka streams metadata request fails on 0.10.0.1 broker/topic from 0.10.1.0 client

2016-10-19 Thread saiprasad mishra
Thanks Michael
Hopefully the upgrade story evolves as 0.10.1+ advances to maturity

Just my 2 cents

Decoupling the kafka streams from the core kafka changes will help so that
the broker can be upgraded without notice and streaming apps can evolve to
newer streaming features on their own pace

Regards
Sai


On Wednesday, October 19, 2016, Michael Noll <mich...@confluent.io> wrote:

> Apps built with Kafka Streams 0.10.1 only work against Kafka clusters
> running 0.10.1+.  This explains your error message above.
>
> Unfortunately, Kafka's current upgrade story means you need to upgrade your
> cluster in this situation.  Moving forward, we're planning to improve the
> upgrade/compatibility story of Kafka so that you could, for example, run a
> newer version of Kafka Streams (or any other Kafka client) against an older
> version of Kafka.
>
>
>
> On Tue, Oct 18, 2016 at 10:56 PM, saiprasad mishra <
> saiprasadmis...@gmail.com> wrote:
>
> > Hi All
> >
> > Was testing with 0.10.1.0 rc3 build for my new streams app
> >
> > Seeing issues starting my kafk streams app( 0.10.1.0) on the old version
> > broker 0.10.0.1. I dont know if it is supposed to work as is. Will
> upgrade
> > the broker to same version and see whether it goes away
> >
> > client side issues
> >
> > ==
> >
> > java.io.EOFException
> >
> > at
> > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(
> > NetworkReceive.java:83)
> > ~[kafka-clients-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.common.network.NetworkReceive.
> > readFrom(NetworkReceive.java:71)
> > ~[kafka-clients-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.common.network.KafkaChannel.receive(
> > KafkaChannel.java:154)
> > ~[kafka-clients-0.10.1.0.jar!/:?]
> >
> > at org.apache.kafka.common.network.KafkaChannel.read(
> > KafkaChannel.java:135)
> > ~[kafka-clients-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.
> > java:343)
> > [kafka-clients-0.10.1.0.jar!/:?]
> >
> > at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
> > [kafka-clients-0.10.1.0.jar!/:?]
> >
> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> > [kafka-clients-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> > ConsumerNetworkClient.java:232)
> > [kafka-clients-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> > ConsumerNetworkClient.java:209)
> > [kafka-clients-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> > awaitMetadataUpdate(ConsumerNetworkClient.java:148)
> > [kafka-clients-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> > awaitMetadataUpdate(ConsumerNetworkClient.java:136)
> > [kafka-clients-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > ensureCoordinatorReady(AbstractCoordinator.java:197)
> > [kafka-clients-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> > ConsumerCoordinator.java:248)
> > [kafka-clients-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> > pollOnce(KafkaConsumer.java:1013)
> > [kafka-clients-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > KafkaConsumer.java:979)
> > [kafka-clients-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:407)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:242)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> >
> >
> > On the broker side the following message appears
> >
> > =
> >
> > kafka.network.InvalidRequestException: Error getting request for
> apiKey: 3
> > and apiVersion: 2
> >
> > at
> > kafka.network.RequestChannel$Request.liftedTree2$1(
> > RequestChannel.scala:95)
> >
> > at kafka.network.RequestChannel$Request.(RequestChannel.scala:87)
> >
> > at
> > kafka.network.Processor$$anonfun$processCompletedRece

kafka streams metadata request fails on 0.10.0.1 broker/topic from 0.10.1.0 client

2016-10-18 Thread saiprasad mishra
Hi All

Was testing with 0.10.1.0 rc3 build for my new streams app

Seeing issues starting my kafk streams app( 0.10.1.0) on the old version
broker 0.10.0.1. I dont know if it is supposed to work as is. Will upgrade
the broker to same version and see whether it goes away

client side issues

==

java.io.EOFException

at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
~[kafka-clients-0.10.1.0.jar!/:?]

at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
~[kafka-clients-0.10.1.0.jar!/:?]

at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
~[kafka-clients-0.10.1.0.jar!/:?]

at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
~[kafka-clients-0.10.1.0.jar!/:?]

at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
[kafka-clients-0.10.1.0.jar!/:?]

at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
[kafka-clients-0.10.1.0.jar!/:?]

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
[kafka-clients-0.10.1.0.jar!/:?]

at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
[kafka-clients-0.10.1.0.jar!/:?]

at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
[kafka-clients-0.10.1.0.jar!/:?]

at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148)
[kafka-clients-0.10.1.0.jar!/:?]

at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:136)
[kafka-clients-0.10.1.0.jar!/:?]

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:197)
[kafka-clients-0.10.1.0.jar!/:?]

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:248)
[kafka-clients-0.10.1.0.jar!/:?]

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
[kafka-clients-0.10.1.0.jar!/:?]

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
[kafka-clients-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
[kafka-streams-0.10.1.0.jar!/:?]



On the broker side the following message appears

=

kafka.network.InvalidRequestException: Error getting request for apiKey: 3
and apiVersion: 2

at
kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:95)

at kafka.network.RequestChannel$Request.(RequestChannel.scala:87)

at
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:488)

at
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483)

at scala.collection.Iterator$class.foreach(Iterator.scala:893)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)

at kafka.network.Processor.run(SocketServer.scala:413)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalArgumentException: Invalid version for API key
3: 2

at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:31)

at
org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:44)

at
org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:60)

at
org.apache.kafka.common.requests.MetadataRequest.parse(MetadataRequest.java:96)

at
org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:48)

at
kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:92)

Regards

Sai


Re: Kafka consumer does not work from eclipse when kafka running inside VM

2015-06-29 Thread saiprasad mishra
I have seen the zk consumers directory getting corrupted due to running
your consumer again and again with some settings changed and causing this
issue. I will do a removal of zk directory from kafka server's zk like below

rmr /consumers/hello and exit zookeeper shell.

Then run your consumer and it should work clean always as expected


Regards
Sai

On Mon, Jun 29, 2015 at 10:10 AM, JIEFU GONG jg...@berkeley.edu wrote:

 Yes, try this command replacing the bracketed things with the correct
 identification for your project:

 *bin/kafka-console-consumer.sh --zookeeper [zookeeper info] --topic
 [your-topic-here] --from-beginning*

 On Mon, Jun 29, 2015 at 9:56 AM, shivam tiwari bigbang...@gmail.com
 wrote:

  It turns out to be the code mentioned does not read topic from beginning
  and actually waits for producer to produce something fresh. After
 starting
  producer and sending messages I was able to retrieve all the messages. Is
  there a way to get messages on topic from beginning?
 
  On 26 June 2015 at 14:32, Gwen Shapira gshap...@cloudera.com wrote:
 
   Zookeeper actually doesn't show any errors - it shows a warning, which
   is pretty normal.
  
   What does your consumer and Kafka broker show? Are there any errors in
   the consumer? Or is it just hanging?
  
   You may want to consult our FAQ:
  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata
   ?
  
   Gwen
  
   On Fri, Jun 26, 2015 at 11:00 AM, shivam tiwari bigbang...@gmail.com
   wrote:
Kafka 0.8.2.2.3 and zookeper both are running inside VM. I was able
 to
   run
both producer and consumer within VM successfully using
kafka-console-producer.sh and kafka-console-consumer.sh respectively.
   Even
I was able to consume Kafka messages from host machine using
kafka-console-consumer.sh. But when I tried to run the consumer using
   java
from eclipse then zookeeper logs following error
   
2015-06-26 03:06:26,323 - INFO
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] -
Closed socket connection for client /192.168.1.12:59549 (no session
established for client)
2015-06-26 03:07:26,225 - INFO
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197]
 -
Accepted socket connection from /192.168.1.12:59617
2015-06-26 03:07:26,226 - WARN
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] -
 caught
end of stream exception
EndOfStreamException: Unable to read additional data from client
sessionid 0x0, likely client has closed socket
at
   org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
at
  
 
 org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
at java.lang.Thread.run(Thread.java:745)
   
Below is my Kafka consume code
   
package com.truckevent.producer;
   
   
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
   
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
   
   
public class KafkaConsumer {
   
public static void main(String[] args) throws Exception {
   
String group = hello ;
   
   
Properties props = new Properties();
props.put(zookeeper.connect, 192.168.1.12:2181);
props.put(group.id, group);
props.put(zookeeper.session.timeout.ms, 2);
props.put(zookeeper.sync.time.ms, 2030);
props.put(auto.commit.interval.ms, 1);
props.put(auto.offset.reset, smallest);
   
ConsumerConfig cf = new ConsumerConfig(props) ;
   
ConsumerConnector consumer =
   Consumer.createJavaConsumerConnector(cf) ;
   
String topic = event ;
   
MapString, Integer topicCountMap = new HashMapString,
   Integer();
topicCountMap.put(topic, new Integer(1));
MapString, ListKafkaStreambyte[], byte[] consumerMap =
consumer.createMessageStreams(topicCountMap);
ListKafkaStreambyte[], byte[] streams =
   consumerMap.get(topic);
   
   
KafkaStreambyte[],byte[] stream = streams.get(0) ;
   
ConsumerIteratorbyte[], byte[] it = stream.iterator();
int i = 1 ;
while (it.hasNext()) {
   
System.out.println(i + :  + new
   String(it.next().message()));
++i;
}
consumer.shutdown();
}
}
   
   
I am not sure why I am not able to consume messages from java code.
  Kafka
is running on port 6667 and zookeeper on 2181.
   
   
--
Regards
   
Shivam Tiwari
  
 
 
 
  --
  Regards
 
  Shivam Tiwari
 



 --

 Jiefu Gong
 University of California, Berkeley | Class of 2017
 B.A Computer Science |