Re: Running cluster of stream processing application

2016-12-08 Thread Sachin Mittal
Hi,
I followed the document and I have few questions.
Say I have a single partition input key topic and say I run 2 streams
application from machine1 and machine2.
Both the application have same application id are have identical code.
Say topic1 has messages like
(k1, v11)
(k1, v12)
(k1, v13)
(k2, v21)
(k2, v22)
(k2, v23)
When I was running single application I was getting results like
(k1, agg(v11, v12, v13))
(k2, agg(v21, v22, v23))

Now when 2 applications are run and say messages are read in round robin
fashion.
v11 v13 v22 - machine 1
v12 v21 v23 - machine 2

The aggregation at machine 1 would be
(k1, agg(v11, v13))
(k2, agg(v22))

The aggregation at machine 2 would be
(k1, agg(v12))
(k2, agg(v21, v23))

So now where do I join the independent results of these 2 aggregation to
get the final result as expected when single instance was running.

Note my high level dsl is sometime like
srcSTopic.aggragate(...).foreach(key, aggregation) {
//process aggragated value and push it to some external storage
}

So I want this each to be running against the final set of aggregated
value. Do I need to add another step before foreach to make sure the
different results from 2 machines are joined to get the final one as
expected. If yes what does that step 2.

Thanks
Sachin






On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hi Sachin,
>
> Some quick answers, and a link to some documentation to read more:
>
> - If you restart the application, it will start from the point it crashed
> (possibly reprocessing a small window of records).
>
> - You can run more than one instance of the application.  They'll
> coordinate by virtue of being part of a Kafka consumer group; if one
> crashes, the partitions that it was reading from will be picked up by other
> instances.
>
> - When running more than one instance, the tasks will be distributed
> between the instances.
>
> Confluent's docs on the Kafka Streams architecture goes into a lot more
> detail: http://docs.confluent.io/3.0.0/streams/architecture.html
>
>
>
>
> On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal  wrote:
>
> > Hi All,
> > We were able to run a stream processing application against a fairly
> decent
> > load of messages in production environment.
> >
> > To make the system robust say the stream processing application crashes,
> is
> > there a way to make it auto start from the point when it crashed?
> >
> > Also is there any concept like running the same application in a cluster,
> > where one fails, other takes over, until we bring back up the failed node
> > of streams application.
> >
> > If yes, is there any guidelines or some knowledge base we can look at to
> > understand how this would work.
> >
> > Is there way like in spark, where the driver program distributes the
> tasks
> > across various nodes in a cluster, is there something similar in kafka
> > streaming too.
> >
> > Thanks
> > Sachin
> >
>


Re: log.retention.hours not working?

2016-12-08 Thread Rodrigo Sandoval
This is what Tood said:

"Retention is going to be based on a combination of both the retention and
segment size settings (as a side note, it's recommended to use
log.retention.ms and log.segment.ms, not the hours config. That's there for
legacy reasons, but the ms configs are more consistent). As messages are
received by Kafka, they are written to the current open log segment for
each partition. That segment is rotated when either the log.segment.bytes
or the log.segment.ms limit is reached. Once that happens, the log segment
is closed and a new one is opened. Only after a log segment is closed can
it be deleted via the retention settings. Once the log segment is closed
AND either all the messages in the segment are older than log.retention.ms
OR the total partition size is greater than log.retention.bytes, then the
log segment is purged.

As a note, the default segment limit is 1 gibibyte. So if you've only
written in 1k of messages, you have a long way to go before that segment
gets rotated. This is why the retention is referred to as a minimum time.
You can easily retain much more than you're expecting for slow topics."

On Dec 9, 2016 02:38, "Rodrigo Sandoval"  wrote:

> Your understanding about segment.bytes and retention.ms is correct. But
> Tood Palino said just after having reached the segment size, that is when
> the segment is "closed"  PLUS all messages within the segment that was
> closed are older than the retention policy defined ( in this case
> retention.ms) THEN delete the segment.
>
> At least according to my testing, it is not necessary to wait until the
> segment is closed to delete it. Simply if all messages in a segment ( no
> matter if the segment reached the size defined by segment.bytes) are older
> than the policy defined by retention.ms , THEN delete the segment.
>
> I have been playing a lot today with kafka, and at least that is what I
> figured out.
>
> On Dec 9, 2016 02:13, "Sachin Mittal"  wrote:
>
>> I think segment.bytes defines the size of single log file before creating
>> a
>> new one.
>> retention.ms defines number of ms to wait on a log file before deleting
>> it.
>>
>> So it is working as defined in docs.
>>
>>
>> On Fri, Dec 9, 2016 at 2:42 AM, Rodrigo Sandoval <
>> rodrigo.madfe...@gmail.com
>> > wrote:
>>
>> > How is that about that when the segment size is reached, plus every
>> single
>> > message inside the segment is older than the retention time, then the
>> > segment will be deleted?
>> >
>> >
>> > I have playing with Kafka and I have the following:
>> >
>> > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
>> > config retention.ms=6
>> >
>> > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
>> > —config file.delete.delay.ms=4
>> >
>> > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
>> > --config segment.bytes=40
>> >
>> > My understanding according to your thoughts is a segment will be deleted
>> > when the segment reaches out the segment size above defined
>> > (segment.bytes=40) PLUS every single message within the segment is
>> > older than the retention time above defined (retention.ms=6).
>> >
>> > What I noticed is a segment of just 35 bytes, which conteined just one
>> > message, was deleted after the minute (maybe a little more). Therefore,
>> the
>> > segment size was not met in order to delete it.
>> >
>>
>


Re: log.retention.hours not working?

2016-12-08 Thread Rodrigo Sandoval
Your understanding about segment.bytes and retention.ms is correct. But
Tood Palino said just after having reached the segment size, that is when
the segment is "closed"  PLUS all messages within the segment that was
closed are older than the retention policy defined ( in this case
retention.ms) THEN delete the segment.

At least according to my testing, it is not necessary to wait until the
segment is closed to delete it. Simply if all messages in a segment ( no
matter if the segment reached the size defined by segment.bytes) are older
than the policy defined by retention.ms , THEN delete the segment.

I have been playing a lot today with kafka, and at least that is what I
figured out.

On Dec 9, 2016 02:13, "Sachin Mittal"  wrote:

> I think segment.bytes defines the size of single log file before creating a
> new one.
> retention.ms defines number of ms to wait on a log file before deleting
> it.
>
> So it is working as defined in docs.
>
>
> On Fri, Dec 9, 2016 at 2:42 AM, Rodrigo Sandoval <
> rodrigo.madfe...@gmail.com
> > wrote:
>
> > How is that about that when the segment size is reached, plus every
> single
> > message inside the segment is older than the retention time, then the
> > segment will be deleted?
> >
> >
> > I have playing with Kafka and I have the following:
> >
> > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
> > config retention.ms=6
> >
> > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
> > —config file.delete.delay.ms=4
> >
> > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
> > --config segment.bytes=40
> >
> > My understanding according to your thoughts is a segment will be deleted
> > when the segment reaches out the segment size above defined
> > (segment.bytes=40) PLUS every single message within the segment is
> > older than the retention time above defined (retention.ms=6).
> >
> > What I noticed is a segment of just 35 bytes, which conteined just one
> > message, was deleted after the minute (maybe a little more). Therefore,
> the
> > segment size was not met in order to delete it.
> >
>


Re: Kafka windowed table not aggregating correctly

2016-12-08 Thread Sachin Mittal
Hi,
Right now in order to circumvent this problem I am using a timestamp whose
values increase by few ms as and when I get new records.
So lets say I have records in order
A -> lower limit TS + 1 sec
B -> lower limit TS + 3 sec
C -> lower limit TS + 5 sec
..
Z -> upper limit TS - 1 sec

Now say I get a record ZZ with ts upper limit TS + 1 sec I assume it will
drop the previous windows and create new ones based on this timestamp.
Please confirm this understanding.

Now lets say I get new record ZZA with timestamp (old) upper limit TS - 1
sec, will this again cause new windows to be dropped and recreate older
windows fresh with all the older aggregation done so far lost?

Thanks
Sachin




On Fri, Dec 9, 2016 at 12:16 AM, Guozhang Wang  wrote:

> Hello Sachin,
>
> I am with you that ideally the windowing segmentation implementation should
> be totally abstracted from users but today it is a bit confusing to
> understand. I have filed JIRA some time ago to improve on this end:
>
> https://issues.apache.org/jira/browse/KAFKA-3596
>
> So to your example, if a "far future record" was received whose timestamp
> is beyond current time + the retention period, it could potentially cause
> the current window to be dropped.
>
>
> Guozhang
>
>
> On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal  wrote:
>
> > Hi,
> > I think now it makes all the sense. The field I was using for timestamp
> > extractor contains timestamps which spans for greater than a day's
> duration
> > and it worked for wall clock because for short duration timestamps were
> in
> > day's range.
> >
> > I wanted to understand one thing:
> > Say I have a timestamp extractor field and as record gets ingested future
> > records will have increasing values for the timestamp.
> >
> > Now lets say default duration is one day. At a future time a record will
> > have timestamp which now is greater than the initial day's range.
> > What will happen then, it will create a new segment and then create
> windows
> > in it for the next day's duration?
> > What happens if now it gets a record from the previous day, will it get
> > discarded or will it again have just the single value aggregated in it
> > (previous values are lost).
> > So when new segment is create as I understand does it retain the older
> > segments data.
> >
> > This is bit confusing, so would be helpful if you can explain in bit more
> > detail.
> >
> > Thanks
> > Sachin
> >
> >
> > On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang 
> wrote:
> >
> > > Sachin,
> > >
> > > One thing to note is that the retention of the windowed stores works by
> > > keeping multiple segments of the stores where each segments stores a
> time
> > > range which can potentially span multiple windows, if a new window
> needs
> > to
> > > be created that is further from the oldest segment's time range +
> > retention
> > > period (from your code it seems you do not override it from
> > > TimeWindows.of("stream-table",
> > > 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of
> one
> > > day is used.
> > >
> > > So with WallclockTimeExtractor since it is using system time, it wont
> > give
> > > you timestamps that span for more than a day during a short period of
> > time,
> > > but if your own defined timestamps expand that value, then old segments
> > > will be dropped immediately and hence the aggregate values will be
> > returned
> > > as a single value.
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > The extractor is used in
> > > >
> > > > org.apache.kafka.streams.processor.internals.
> > RecordQueue#addRawRecords()
> > > >
> > > > Let us know, if you could resolve the problem or need more help.
> > > >
> > > > -Matthias
> > > >
> > > > On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > > > > https://github.com/SOHU-Co/kafka-node/ this is the node js client
> i
> > am
> > > > > using. The version is 0.5x. Can you please tell me what code in
> > streams
> > > > > calls the timestamp extractor. I can look there to see if there is
> > any
> > > > > issue.
> > > > >
> > > > > Again issue happens only when producing the messages using producer
> > > that
> > > > is
> > > > > compatible with kafka version 0.8x. I see that this producer does
> not
> > > > send
> > > > > a record timestamp as this was introduced in version 0.10 only.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" 
> > > > wrote:
> > > > >
> > > > >> I am not sure what is happening. That's why it would be good to
> > have a
> > > > >> toy example to reproduce the issue.
> > > > >>
> > > > >> What do you mean by "Kafka node version 0.5"?
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> > > > >>> I can provide with the data but data does not seem to be the
> issue.
> > > > >>> If I submit the same data and use same timestamp extractor  using
> > the
> > 

Re: log.retention.hours not working?

2016-12-08 Thread Sachin Mittal
I think segment.bytes defines the size of single log file before creating a
new one.
retention.ms defines number of ms to wait on a log file before deleting it.

So it is working as defined in docs.


On Fri, Dec 9, 2016 at 2:42 AM, Rodrigo Sandoval  wrote:

> How is that about that when the segment size is reached, plus every single
> message inside the segment is older than the retention time, then the
> segment will be deleted?
>
>
> I have playing with Kafka and I have the following:
>
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
> config retention.ms=6
>
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
> —config file.delete.delay.ms=4
>
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
> --config segment.bytes=40
>
> My understanding according to your thoughts is a segment will be deleted
> when the segment reaches out the segment size above defined
> (segment.bytes=40) PLUS every single message within the segment is
> older than the retention time above defined (retention.ms=6).
>
> What I noticed is a segment of just 35 bytes, which conteined just one
> message, was deleted after the minute (maybe a little more). Therefore, the
> segment size was not met in order to delete it.
>


Re: Running cluster of stream processing application

2016-12-08 Thread Mathieu Fenniak
Hi Sachin,

Some quick answers, and a link to some documentation to read more:

- If you restart the application, it will start from the point it crashed
(possibly reprocessing a small window of records).

- You can run more than one instance of the application.  They'll
coordinate by virtue of being part of a Kafka consumer group; if one
crashes, the partitions that it was reading from will be picked up by other
instances.

- When running more than one instance, the tasks will be distributed
between the instances.

Confluent's docs on the Kafka Streams architecture goes into a lot more
detail: http://docs.confluent.io/3.0.0/streams/architecture.html




On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal  wrote:

> Hi All,
> We were able to run a stream processing application against a fairly decent
> load of messages in production environment.
>
> To make the system robust say the stream processing application crashes, is
> there a way to make it auto start from the point when it crashed?
>
> Also is there any concept like running the same application in a cluster,
> where one fails, other takes over, until we bring back up the failed node
> of streams application.
>
> If yes, is there any guidelines or some knowledge base we can look at to
> understand how this would work.
>
> Is there way like in spark, where the driver program distributes the tasks
> across various nodes in a cluster, is there something similar in kafka
> streaming too.
>
> Thanks
> Sachin
>


Running cluster of stream processing application

2016-12-08 Thread Sachin Mittal
Hi All,
We were able to run a stream processing application against a fairly decent
load of messages in production environment.

To make the system robust say the stream processing application crashes, is
there a way to make it auto start from the point when it crashed?

Also is there any concept like running the same application in a cluster,
where one fails, other takes over, until we bring back up the failed node
of streams application.

If yes, is there any guidelines or some knowledge base we can look at to
understand how this would work.

Is there way like in spark, where the driver program distributes the tasks
across various nodes in a cluster, is there something similar in kafka
streaming too.

Thanks
Sachin


Running mirror maker between two different version of kafka

2016-12-08 Thread Vijayanand Rengarajan
Team,
I am trying to mirror few topics from cluster A( version 0.8.1) to Cluster B 
(version 0.10.1.0), but due to version incompatibility I am getting below 
error.if any one of you had similar issues, please share the work 
around/solution to this issue.
I am running the kafka mirroring in destination cluster which is 0.10.1.0 
version of kafka  installed.
There is no firewall and iptables between these two clusters.
 WARN 
[ConsumerFetcherThread-console-consumer-27615_kafkanode01-1481247967907-68767097-0-30],
 Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@26902baa 
(kafka.consumer.ConsumerFetcherThread)java.io.EOFException at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
 at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) at 
kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) at 
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
 at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
 at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
 at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
 at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) 
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) 
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at 
kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) at 
kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:109) at 
kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:29) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

Thanks, vijayanand.

Re: log.retention.hours not working?

2016-12-08 Thread Rodrigo Sandoval Tejerina
How is that about that when the segment size is reached, plus every single 
message inside the segment is older than the retention time, then the segment 
will be deleted?

I have playing with Kafka and I have the following:

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 config 
retention.ms=6
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 —config 
file.delete.delay.ms=4
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --config 
segment.bytes=40
My understanding according to your thoughts is a segment will be deleted when 
the segment reaches out the segment size above defined (segment.bytes=40) 
PLUS every single message within the segment is older than the retention time 
above defined (retention.ms=6).

What I noticed is a segment of just 35 bytes, which conteined just one message, 
was deleted after the minute (maybe a little more). Therefore, the segment size 
was not met in order to delete it.

Re: log.retention.hours not working?

2016-12-08 Thread Rodrigo Sandoval Tejerina
How is that about that when the segment size is reached, plus every single 
message inside the segment is older than the retention time, then the segment 
will be deleted?

I have playing with Kafka and I have the following:

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 config 
retention.ms=6
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 —config 
file.delete.delay.ms=4
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --config 
segment.bytes=40
My understanding according to your thoughts is a segment will be deleted when 
the segment reaches out the segment size above defined (segment.bytes=40) 
PLUS every single message within the segment is older than the retention time 
above defined (retention.ms=6).

What I noticed is a segment of just 35 bytes, which conteined just one message, 
was deleted after the minute (maybe a little more). Therefore, the segment size 
was not met in order to delete it.

Kafka supported on AIX OS?

2016-12-08 Thread Jayanna, Gautham
Hi,
We are trying to determine if we can run Kafka on AIX OS, however I could not 
find definite information in the wiki page or by searching on internet.
I would greatly appreciate if you could let us know if we can run Kafka on AIX 
or if there are plans to support AIX in a future release.

Regards,
Gautham


This message and any attachments are solely for the use of intended recipients. 
The information contained herein may include trade secrets, protected health or 
personal information, privileged or otherwise confidential information. 
Unauthorized review, forwarding, printing, copying, distributing, or using such 
information is strictly prohibited and may be unlawful. If you are not an 
intended recipient, you are hereby notified that you received this email in 
error, and that any review, dissemination, distribution or copying of this 
email and any attachment is strictly prohibited. If you have received this 
email in error, please contact the sender and delete the message and any 
attachment from your system. Thank you for your cooperation


Re: [VOTE] 0.10.1.1 RC0

2016-12-08 Thread Bernard Leach
The scala 2.12 artifacts aren’t showing up, any chance of publishing them?

> On 9 Dec 2016, at 07:57, Vahid S Hashemian  wrote:
> 
> +1
> 
> Build and quickstart worked fine on Ubuntu, Mac, Windows 32 and 64 bit.
> 
> Thanks for running the release.
> 
> Regards,
> --Vahid 
> 
> 
> 
> 
> From:   Guozhang Wang 
> To: "users@kafka.apache.org" , 
> "d...@kafka.apache.org" , 
> kafka-clie...@googlegroups.com
> Date:   12/07/2016 02:47 PM
> Subject:[VOTE] 0.10.1.1 RC0
> 
> 
> 
> Hello Kafka users, developers and client-developers,
> 
> This is the first candidate for the release of Apache Kafka 0.10.1.1. This 
> is
> a bug fix release and it includes fixes and improvements from 27 JIRAs. 
> See
> the release notes for more details:
> 
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc0/RELEASE_NOTES.html
> 
> *** Please download, test and vote by Monday, 13 December, 8am PT ***
> 
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
> 
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc0/
> 
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
> 
> * Javadoc:
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc0/javadoc/
> 
> * Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc0 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=8b77507083fdd427ce81021228e7e346da0d814c
> 
> 
> 
> Thanks,
> Guozhang
> 
> 
> 
> 



The connection between kafka and zookeeper is often closed by zookeeper, lead to NotLeaderForPartitionException: This server is not the leader for that topic-partition.

2016-12-08 Thread Jiecxy
Hi guys,

Situation:
  3 nodes, each 32G memory, CPU 24 cores, 1T hd.
  3 brokers on 3 nodes, and 3 zookeeper on these 3 nodes too, all the 
properties are default, start the zookeeper cluster and kafka cluster.
  Create a topic (3 replications, 6 partions), like below:
bin/kafka-topics.sh --create --zookeeper hw168:2181 --replication-factor 3 
--partitions 6 --topic test
  And run the ProducerPerformance given by kafka on the two nodes at the same 
time, it means we have two producers, command like below:
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --topic 
test --num-records 1 --record-size 100 --throughput -1 --producer-props 
bootstrap.servers=hw168:9092 buffer.memory=67108864 batch.size=65536 acks=1

Problem:
  We can see from the producer, a lot of  NotLeaderForPartitionException:
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition.
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition.
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition.
…

Track the process (by using DEBUG):
  There is a INFO: 
INFO Client session timed out, have not heard from server in 11647ms for 
sessionid 0x258de4a26a4, closing socket connection and attempting reconnect 
(org.apache.zookeeper.ClientCnxn)

  And We found that the connection between zkClient (kafka holds) and zookeeper 
server is closed by zookeeper server, the reason is that time is out, for 
details:
[2016-12-08 20:24:00,547] DEBUG Partition [test,5] on broker 1: Skipping 
update high watermark since Old hw 15986847 [8012779 : 1068525112] is larger 
than new hw 15986847 [8012779 : 1068525112] for partition [test,5]. All leo's 
are 16566175 [16025299 : 72477384],15986847 [8012779 : 1068525112],16103549 
[16025299 : 10485500] (kafka.cluster.Partition)
[2016-12-08 20:24:00,547] DEBUG Adding index entry 16566161 => 72475508 to 
16025299.index. (kafka.log.OffsetIndex)
[2016-12-08 20:24:11,368] DEBUG [Replica Manager on Broker 1]: Request key 
test-2 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
[2016-12-08 20:24:11,368] DEBUG Partition [test,2] on broker 1: Skipping 
update high watermark since Old hw 16064424 [16025299 : 5242750] is larger than 
new hw 16064424 [16025299 : 5242750] for partition [test,2]. All leo's are 
16566175 [16025299 : 72477384],16205274 [16025299 : 24116650],16064424 
[16025299 : 5242750] (kafka.cluster.Partition)
[2016-12-08 20:24:11,369] DEBUG [Replica Manager on Broker 1]: Produce to 
local log in 10821 ms (kafka.server.ReplicaManager)
[2016-12-08 20:24:11,369] INFO Client session timed out, have not heard 
from server in 11647ms for sessionid 0x258de4a26a4, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)

  Please watch the time, the there is no DEBUG between 20:24:00,547 and 
20:24:11,368, it already exceeded the time for session timeout (6000ms), so it 
causes this disconnection.  We keep digging:
  We found that it got stuck in the function:
  selector.select(waitTimeOut); — in the method doTransport(…) in class 
org.apache.zookeeper.ClientCnxnSocketNIO
  and that is the time ww got no DEBUG.
  For more details, Call procedure (zookeeper client):
  org.apache.zookeeper.ClientCnxn -> run() -> doTransport(..)
  In the function run(), every time it will check whether there is a timeout, 
if not, it will run doTransport, but the doTransport costs about 10s, so next 
loop, it will find the timeout.

  Keep going, I thought there could be a deadlock at that time, so I keep 
printing the jstack of the kafka and zookeeper. Using the shell like below:
  while true; do echo -e "\n\n"`date "+%Y-%m-%d %H:%M:%S,%N"`"\n"`jstack 
12165` >> zkjstack; echo -e "\n\n"`date "+%Y-%m-%d %H:%M:%S,%N"`"\n"`jstack 
12425` >> kafkajstack; done
  And I check the period we got NO DEBUG in the stack logs, surprise, there 
also NO LOG at that time!!! Why?

So I’m confused that why it got stuck in that function? Why there is no DEBUG 
or LOG in that weird period? Please help me. Thank you all.
  

Xiaoyuan



kafka streams passes org.apache.kafka.streams.kstream.internals.Change to my app!!

2016-12-08 Thread Ara Ebrahimi
Hi,

Once in a while and quite randomly this happens, but it does happen every few 
hundred thousand message:

2016-12-03 11:48:05 ERROR StreamThread:249 - stream-thread [StreamThread-4] 
Streams application error during processing:
java.lang.ClassCastException: org.apache.kafka.streams.kstream.internals.Change 
cannot be cast to com.argyledata.streams.entity.Activity
at com.argyledata.streams.StreamPipeline$$Lambda$14/33419717.apply(Unknown 
Source)
at 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at 
org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
at 
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:199)
at 
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
at 
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
at 
org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateValueGetter.get(KStreamAggregate.java:112)
at 
org.apache.kafka.streams.kstream.internals.KStreamKTableLeftJoin$KStreamKTableLeftJoinProcessor.process(KStreamKTableLeftJoin.java:61)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

Has anyone else seen this weird problem?

Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: Creating a connector with Kafka Connect Distributed returning 500 error

2016-12-08 Thread Phillip Mann
Hello Konstantine and community,

I was able to fix this problem by using the latest version of Confluent 
Platform.  I was running CP 3.0.1 but upgraded to 3.1.1 and my worker and 
connector behaved as expected.  Thanks!

Phillip

From: Konstantine Karantasis 
Date: Wednesday, December 7, 2016 at 1:02 PM
To: Phillip Mann 
Cc: "users@kafka.apache.org" 
Subject: Re: Creating a connector with Kafka Connect Distributed returning 500 
error


The bug I was referring to was only in trunk for just a while. Thus, your issue 
must be related to something else, even though the response statuses are 
similar.

Let me know if you want to share a bigger and more detailed (DEBUG level at 
least) snapshot of the parts of the logs that might be related to this failure.

Cheers,
Konstantine

On Wed, Dec 7, 2016 at 11:15 AM, Phillip Mann 
mailto:pm...@trulia.com>> wrote:
Hello Konstantine,

Thanks for your reply.

I am using Confluent 3.0.1 installed on my machine and our cluster.  However, 
our AWS cluster has Confluent 3.1.1 installed so I will test with 3.1.1 client 
and cluster and see if this resolves the issue.  Additionally, I’ll use the 
debug levels if this does not resolve my issue.

If not, I’ll explore the trunk repo but I would prefer to use stable versions 
of CP / Kafka that can be accessed with Maven.

Thanks again.

Phillip

> Hi Phillip,
>
> may I ask which Kafka version did you use?
>
> trunk repo in Apache Kafka contained briefly a bug in Connect framework
> (during the past week) that produced failures similar to the one you
> describe (only in distributed mode). A fix has been pushed since yesterday.
>
> 3) Some useful step-by-step information is provided in the quickstart guide
> here:
> https://kafka.apache.org/quickstart#quickstart_kafkaconnect
>
> as well as in the documentation of Confluent:
> http://docs.confluent.io/3.1.0/connect/quickstart.html#
>
> Alternatively, you might want to follow the quickstart guide of one of the
> open source connectors, here:
> http://docs.confluent.io/3.1.0/connect/connectors.html
>
> 2) From what you mention above, it seems more like that you're hitting this
> temporary bug. But again that depends on which Kafka version you've been
> using.
>
> 1) Generating logs, in one of the debug levels (e.g. DEBUG, TRACE) is
> usually a useful source of information.
> Alternatively you may chose to run Connect in debug mode by setting the
> environment variable KAFKA_DEBUG and attaching a remote debugger to it
> (such as IntelliJ's remote debugging capability). With respect to live
> debugging, we are planning to post a step-by-step guide for Kafka and Kafka
> Connect soon.
>
> Regards,
> Konstantine
>
>> On Tue, Dec 6, 2016 at 11:22 AM, Phillip Mann 
>> mailto:pm...@trulia.com>> wrote:
>>
>> I am working on migrating from Camus to Kafka Connect. I am working on the
>> implementation of Kafka Connect and specifically focused on distributed
>> mode. I am able to start a worker successfully on my local machine which I
>> assume communicates with my Kafka cluster. I am further able to run two GET
>> commands such as / and /connector-plugins which return the correct JSON.
>> However, when I try to POST a command to create a connector, I receive a
>> 500 error and a time out. Specifically, I use this command to POST for
>> testing:
>>
>> curl -X POST -H "Content-Type: application/json" --data '{"name":
>> "local-file-sink", "config": {"connector.class":"FileStreamSinkConnector",
>> "tasks.max":"1", "file":"test.sink.txt", "topics":"myTopic" }}'
>> localhost:8083/connectors
>>
>> and eventually I get this response:
>>
>> {"error_code": 500, "message": "Request timed out"}
>>
>> I am lost as to what is going on. The logs from my Kafka Connect
>> distributed worker show this:
>>
>> [2016-12-05 14:34:32,436] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:22:34:32
>> +] "GET /connector-plugins HTTP/1.1" 200 315  2
>> (org.apache.kafka.connect.runtime.rest.RestServer:60)
>> [2016-12-05 15:05:25,422] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:05:25
>> +] "GET /connector-plugins HTTP/1.1" 200 315  3
>> (org.apache.kafka.connect.runtime.rest.RestServer:60)
>> [2016-12-05 15:05:28,389] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:05:28
>> +] "GET /connector-plugins HTTP/1.1" 200 315  2
>> (org.apache.kafka.connect.runtime.rest.RestServer:60)
>> [2016-12-05 15:07:38,644] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:06:08
>> +] "GET /connectors HTTP/1.1" 500 48  90003 (org.apache.kafka.connect.
>> runtime.rest.RestServer:60)
>> [2016-12-05 15:07:44,450] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:07:44
>> +] "GET /connector-plugins HTTP/1.1" 200 315  1
>> (org.apache.kafka.connect.runtime.rest.RestServer:60)
>> [2016-12-05 15:13:06,703] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:11:36
>> +] "POST /connectors HTTP/1.1" 500 48  90003 (org.apache.kafka.connect.
>> runtime.rest.RestServer:60)
>> [2016-12-05 15:15:38,506] INFO 0:0:0:0:0:0:0:

controlling memory growth when aggregating

2016-12-08 Thread Jon Yeargers
I working with JSON data that has an array member. Im aggregating values
into this using minute long windows.

I ran the app for ~10 minutes and watched it consume 40% of the memory on a
box with 32G. It was still growing when I stopped it. At this point it had
created ~800 values each of which was < 1Mb in size (owing to the
limitations on message size set at the broker). (I wrote all the values
into Redis so I could count them and check the aggregation).

1. Why is it consuming so much memory?
2. Is there a strategy for controlling this growth?

I get that it's keeping every window open in case a new value shows up.
Maybe some way to relax this using event time vs clock time?


Re: log.retention.hours not working?

2016-12-08 Thread Rodrigo Sandoval
How is that about that when the segment size is reached, plus every single
message inside the segment is older than the retention time, then the
segment will be deleted?


I have playing with Kafka and I have the following:

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
config retention.ms=6

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
—config file.delete.delay.ms=4

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
--config segment.bytes=40

My understanding according to your thoughts is a segment will be deleted
when the segment reaches out the segment size above defined
(segment.bytes=40) PLUS every single message within the segment is
older than the retention time above defined (retention.ms=6).

What I noticed is a segment of just 35 bytes, which conteined just one
message, was deleted after the minute (maybe a little more). Therefore, the
segment size was not met in order to delete it.


Re: [VOTE] 0.10.1.1 RC0

2016-12-08 Thread Vahid S Hashemian
+1

Build and quickstart worked fine on Ubuntu, Mac, Windows 32 and 64 bit.

Thanks for running the release.

Regards,
--Vahid 




From:   Guozhang Wang 
To: "users@kafka.apache.org" , 
"d...@kafka.apache.org" , 
kafka-clie...@googlegroups.com
Date:   12/07/2016 02:47 PM
Subject:[VOTE] 0.10.1.1 RC0



Hello Kafka users, developers and client-developers,

This is the first candidate for the release of Apache Kafka 0.10.1.1. This 
is
a bug fix release and it includes fixes and improvements from 27 JIRAs. 
See
the release notes for more details:

http://home.apache.org/~guozhang/kafka-0.10.1.1-rc0/RELEASE_NOTES.html

*** Please download, test and vote by Monday, 13 December, 8am PT ***

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~guozhang/kafka-0.10.1.1-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
http://home.apache.org/~guozhang/kafka-0.10.1.1-rc0/javadoc/

* Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc0 tag:
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=8b77507083fdd427ce81021228e7e346da0d814c



Thanks,
Guozhang






Re: Kafka windowed table not aggregating correctly

2016-12-08 Thread Guozhang Wang
Hello Sachin,

I am with you that ideally the windowing segmentation implementation should
be totally abstracted from users but today it is a bit confusing to
understand. I have filed JIRA some time ago to improve on this end:

https://issues.apache.org/jira/browse/KAFKA-3596

So to your example, if a "far future record" was received whose timestamp
is beyond current time + the retention period, it could potentially cause
the current window to be dropped.


Guozhang


On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal  wrote:

> Hi,
> I think now it makes all the sense. The field I was using for timestamp
> extractor contains timestamps which spans for greater than a day's duration
> and it worked for wall clock because for short duration timestamps were in
> day's range.
>
> I wanted to understand one thing:
> Say I have a timestamp extractor field and as record gets ingested future
> records will have increasing values for the timestamp.
>
> Now lets say default duration is one day. At a future time a record will
> have timestamp which now is greater than the initial day's range.
> What will happen then, it will create a new segment and then create windows
> in it for the next day's duration?
> What happens if now it gets a record from the previous day, will it get
> discarded or will it again have just the single value aggregated in it
> (previous values are lost).
> So when new segment is create as I understand does it retain the older
> segments data.
>
> This is bit confusing, so would be helpful if you can explain in bit more
> detail.
>
> Thanks
> Sachin
>
>
> On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang  wrote:
>
> > Sachin,
> >
> > One thing to note is that the retention of the windowed stores works by
> > keeping multiple segments of the stores where each segments stores a time
> > range which can potentially span multiple windows, if a new window needs
> to
> > be created that is further from the oldest segment's time range +
> retention
> > period (from your code it seems you do not override it from
> > TimeWindows.of("stream-table",
> > 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of one
> > day is used.
> >
> > So with WallclockTimeExtractor since it is using system time, it wont
> give
> > you timestamps that span for more than a day during a short period of
> time,
> > but if your own defined timestamps expand that value, then old segments
> > will be dropped immediately and hence the aggregate values will be
> returned
> > as a single value.
> >
> > Guozhang
> >
> >
> > On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax 
> > wrote:
> >
> > > The extractor is used in
> > >
> > > org.apache.kafka.streams.processor.internals.
> RecordQueue#addRawRecords()
> > >
> > > Let us know, if you could resolve the problem or need more help.
> > >
> > > -Matthias
> > >
> > > On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > > > https://github.com/SOHU-Co/kafka-node/ this is the node js client i
> am
> > > > using. The version is 0.5x. Can you please tell me what code in
> streams
> > > > calls the timestamp extractor. I can look there to see if there is
> any
> > > > issue.
> > > >
> > > > Again issue happens only when producing the messages using producer
> > that
> > > is
> > > > compatible with kafka version 0.8x. I see that this producer does not
> > > send
> > > > a record timestamp as this was introduced in version 0.10 only.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" 
> > > wrote:
> > > >
> > > >> I am not sure what is happening. That's why it would be good to
> have a
> > > >> toy example to reproduce the issue.
> > > >>
> > > >> What do you mean by "Kafka node version 0.5"?
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> > > >>> I can provide with the data but data does not seem to be the issue.
> > > >>> If I submit the same data and use same timestamp extractor  using
> the
> > > >> java
> > > >>> client with kafka version 0.10.0.1 aggregation works fine.
> > > >>> I find the issue only when submitting the data with kafka node
> > version
> > > >> 0.5.
> > > >>> It looks like the stream does not extract the time correctly in
> that
> > > >> case.
> > > >>>
> > > >>> Thanks
> > > >>> Sachin
> > > >>>
> > > >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax"  >
> > > >> wrote:
> > > >>>
> > >  Can you provide example input data (including timetamps) and
> result.
> > >  What is the expected result (ie, what aggregation do you apply)?
> > > 
> > > 
> > >  -Matthias
> > > 
> > >  On 12/2/16 7:43 AM, Sachin Mittal wrote:
> > > > Hi,
> > > > After much debugging I found an issue with timestamp extractor.
> > > >
> > > > If I use a custom timestamp extractor with following code:
> > > > public static class MessageTimestampExtractor implements
> > > > TimestampExtractor {
> > > > public long extract(ConsumerRecord
> record)
> > {

Re: KafkaStreams metadata - enum keys?

2016-12-08 Thread Damian Guy
Yes it could be an issue when you initially startup. If it is the first
time you run the app and there are internal topics created by Kafka
Streams, there will be rebalances. However it depends on your topology.

How are you trying to access the state store?

Thanks,
Damian

On Thu, 8 Dec 2016 at 17:49 Jon Yeargers  wrote:

> Im only running one consumer-instance so would rebalancing / wrong host be
> an issue?
>
>
>
> On Thu, Dec 8, 2016 at 7:31 AM, Damian Guy  wrote:
>
> > Hi Jon,
> >
> > How are you trying to access the store?
> >
> > That exception is thrown in a few circumstances:
> > 1. KakfaStreams hasn't initialized or is re-initializing due to a
> > rebalance. This can occur for a number of reasons, i.e., new
> > topics/partitions being added to the broker (including streams internal
> > topics), broker going down, StreamThreads starting or stopping etc
> > 2. The StateStore has just been closed, which would usually mean that 1.
> is
> > about to happen
> > 3. The StateStore with that name and type doesn't exist on the local
> > KakfaStreams instance.
> >
> > Thanks,
> > Damian
> >
> > On Thu, 8 Dec 2016 at 11:57 Jon Yeargers 
> wrote:
> >
> > > Tried calling that - got this exception (FWIW - there isn't any other
> > > instance)
> > >
> > > State store value comes from
> > >
> > > groupByKey().aggregate(LogLine::new,
> > > new aggregate(),
> > > TimeWindows.of(60 * 60 * 1000L),
> > > collectorSerde, "minute_agg_stream");
> > >
> > > 2016-12-08 11:33:50,924 [qtp1318180415-18] DEBUG
> > > o.eclipse.jetty.server.HttpChannel - Could not send response error 500,
> > > already committed
> > >
> > > javax.servlet.ServletException:
> > > org.apache.kafka.streams.errors.InvalidStateStoreException: the state
> > > store, minute_agg_stream, may have migrated to another instance.
> > >
> > > at
> > >
> > > org.glassfish.jersey.servlet.WebComponent.serviceImpl(
> > WebComponent.java:489)
> > >
> > > at org.glassfish.jersey.servlet.WebComponent.service(
> > WebComponent.java:427)
> > >
> > > at
> > >
> > > org.glassfish.jersey.servlet.ServletContainer.service(
> > ServletContainer.java:388)
> > >
> > > at
> > >
> > > org.glassfish.jersey.servlet.ServletContainer.service(
> > ServletContainer.java:341)
> > >
> > > at
> > >
> > > org.glassfish.jersey.servlet.ServletContainer.service(
> > ServletContainer.java:228)
> > >
> > > at org.eclipse.jetty.servlet.ServletHolder.handle(
> > ServletHolder.java:845)
> > >
> > > at
> > > org.eclipse.jetty.servlet.ServletHandler.doHandle(
> > ServletHandler.java:584)
> > >
> > > at
> > >
> > > org.eclipse.jetty.server.session.SessionHandler.
> > doHandle(SessionHandler.java:224)
> > >
> > > at
> > >
> > > org.eclipse.jetty.server.handler.ContextHandler.
> > doHandle(ContextHandler.java:1180)
> > >
> > > at
> > > org.eclipse.jetty.servlet.ServletHandler.doScope(
> > ServletHandler.java:512)
> > >
> > > at
> > >
> > > org.eclipse.jetty.server.session.SessionHandler.
> > doScope(SessionHandler.java:185)
> > >
> > > at
> > >
> > > org.eclipse.jetty.server.handler.ContextHandler.
> > doScope(ContextHandler.java:1112)
> > >
> > > at
> > >
> > > org.eclipse.jetty.server.handler.ScopedHandler.handle(
> > ScopedHandler.java:141)
> > >
> > > at
> > >
> > > org.eclipse.jetty.server.handler.HandlerWrapper.handle(
> > HandlerWrapper.java:134)
> > >
> > > at org.eclipse.jetty.server.Server.handle(Server.java:534)
> > >
> > > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
> > >
> > > at
> > > org.eclipse.jetty.server.HttpConnection.onFillable(
> > HttpConnection.java:251)
> > >
> > > at
> > > org.eclipse.jetty.io
> > > .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
> > >
> > > at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95)
> > >
> > > at
> > > org.eclipse.jetty.io
> > > .SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
> > >
> > > at
> > >
> > > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.
> > executeProduceConsume(ExecuteProduceConsume.java:303)
> > >
> > > at
> > >
> > > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.
> > produceConsume(ExecuteProduceConsume.java:148)
> > >
> > > at
> > >
> > > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(
> > ExecuteProduceConsume.java:136)
> > >
> > > at
> > >
> > > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(
> > QueuedThreadPool.java:671)
> > >
> > > at
> > >
> > > org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(
> > QueuedThreadPool.java:589)
> > >
> > > at java.lang.Thread.run(Thread.java:745)
> > >
> > > Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
> > the
> > > state store, minute_agg_stream, may have migrated to another instance.
> > >
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.QueryableStoreProvider.
> > getStore(QueryableStoreProvider.java:49)
> > >
> > > at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378)
> > >
> > > at
> > >
> >

Re: KafkaStreams metadata - enum keys?

2016-12-08 Thread Jon Yeargers
Im only running one consumer-instance so would rebalancing / wrong host be
an issue?



On Thu, Dec 8, 2016 at 7:31 AM, Damian Guy  wrote:

> Hi Jon,
>
> How are you trying to access the store?
>
> That exception is thrown in a few circumstances:
> 1. KakfaStreams hasn't initialized or is re-initializing due to a
> rebalance. This can occur for a number of reasons, i.e., new
> topics/partitions being added to the broker (including streams internal
> topics), broker going down, StreamThreads starting or stopping etc
> 2. The StateStore has just been closed, which would usually mean that 1. is
> about to happen
> 3. The StateStore with that name and type doesn't exist on the local
> KakfaStreams instance.
>
> Thanks,
> Damian
>
> On Thu, 8 Dec 2016 at 11:57 Jon Yeargers  wrote:
>
> > Tried calling that - got this exception (FWIW - there isn't any other
> > instance)
> >
> > State store value comes from
> >
> > groupByKey().aggregate(LogLine::new,
> > new aggregate(),
> > TimeWindows.of(60 * 60 * 1000L),
> > collectorSerde, "minute_agg_stream");
> >
> > 2016-12-08 11:33:50,924 [qtp1318180415-18] DEBUG
> > o.eclipse.jetty.server.HttpChannel - Could not send response error 500,
> > already committed
> >
> > javax.servlet.ServletException:
> > org.apache.kafka.streams.errors.InvalidStateStoreException: the state
> > store, minute_agg_stream, may have migrated to another instance.
> >
> > at
> >
> > org.glassfish.jersey.servlet.WebComponent.serviceImpl(
> WebComponent.java:489)
> >
> > at org.glassfish.jersey.servlet.WebComponent.service(
> WebComponent.java:427)
> >
> > at
> >
> > org.glassfish.jersey.servlet.ServletContainer.service(
> ServletContainer.java:388)
> >
> > at
> >
> > org.glassfish.jersey.servlet.ServletContainer.service(
> ServletContainer.java:341)
> >
> > at
> >
> > org.glassfish.jersey.servlet.ServletContainer.service(
> ServletContainer.java:228)
> >
> > at org.eclipse.jetty.servlet.ServletHolder.handle(
> ServletHolder.java:845)
> >
> > at
> > org.eclipse.jetty.servlet.ServletHandler.doHandle(
> ServletHandler.java:584)
> >
> > at
> >
> > org.eclipse.jetty.server.session.SessionHandler.
> doHandle(SessionHandler.java:224)
> >
> > at
> >
> > org.eclipse.jetty.server.handler.ContextHandler.
> doHandle(ContextHandler.java:1180)
> >
> > at
> > org.eclipse.jetty.servlet.ServletHandler.doScope(
> ServletHandler.java:512)
> >
> > at
> >
> > org.eclipse.jetty.server.session.SessionHandler.
> doScope(SessionHandler.java:185)
> >
> > at
> >
> > org.eclipse.jetty.server.handler.ContextHandler.
> doScope(ContextHandler.java:1112)
> >
> > at
> >
> > org.eclipse.jetty.server.handler.ScopedHandler.handle(
> ScopedHandler.java:141)
> >
> > at
> >
> > org.eclipse.jetty.server.handler.HandlerWrapper.handle(
> HandlerWrapper.java:134)
> >
> > at org.eclipse.jetty.server.Server.handle(Server.java:534)
> >
> > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
> >
> > at
> > org.eclipse.jetty.server.HttpConnection.onFillable(
> HttpConnection.java:251)
> >
> > at
> > org.eclipse.jetty.io
> > .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
> >
> > at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95)
> >
> > at
> > org.eclipse.jetty.io
> > .SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
> >
> > at
> >
> > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.
> executeProduceConsume(ExecuteProduceConsume.java:303)
> >
> > at
> >
> > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.
> produceConsume(ExecuteProduceConsume.java:148)
> >
> > at
> >
> > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(
> ExecuteProduceConsume.java:136)
> >
> > at
> >
> > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(
> QueuedThreadPool.java:671)
> >
> > at
> >
> > org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(
> QueuedThreadPool.java:589)
> >
> > at java.lang.Thread.run(Thread.java:745)
> >
> > Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
> the
> > state store, minute_agg_stream, may have migrated to another instance.
> >
> > at
> >
> > org.apache.kafka.streams.state.internals.QueryableStoreProvider.
> getStore(QueryableStoreProvider.java:49)
> >
> > at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378)
> >
> > at
> >
> > com.cedexis.prtminuteagg.RestService.rangeForKeyValueStore(
> RestService.java:190)
> >
> > at
> > com.cedexis.prtminuteagg.RestService.keyRangeForStore(
> RestService.java:99)
> >
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >
> > at
> >
> > sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> >
> > at
> >
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >
> > at java.lang.reflect.Method.invoke(Method.java:498)
> >
> > at
> >
> > org.glassfish.jersey.server.model.internal.
> ResourceMethodInvocationHandlerFactory$1.invoke(
> ResourceMethodInvocationH

Re: Q about doc of consumer

2016-12-08 Thread Vahid S Hashemian
Ryan,

The correct consumer command in the latest doc (
http://kafka.apache.org/quickstart#quickstart_consume) is

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
test --from-beginning

You used the "--zookeeper" parameter which implies using the old consumer, 
in which case the correct port is 2181, as you figured out.
Using "--bootstrap-server" and port 9092 implies using the new (Java) 
consumer.

Regards,
--Vahid




From:   "paradixrain" 
To: "users" 
Date:   12/07/2016 09:10 PM
Subject:Q about doc of consumer



Dear kafka,
I think there is an error in the document, is that right?


Here's what I did:
Step 1:
 open a producer
./kafka-console-producer.sh --broker-list localhost:9092 --topic test

Step 2:
open a consumer
./kafka-console-consumer.sh --zookeeper localhost:9092 --topic test 
--from-beginning

Step 3:
I input something in producer

but got errors below in consumer:


Step 4:
I change the port in Step 2 from 9090 to 2181, and restarted consumer
after that I got what I want


--
YOUR FRIEND,
Ryan
 





kafka commands taking a long time

2016-12-08 Thread Stephen Cresswell
I followed the quickstart instructions at
https://kafka.apache.org/quickstart and everything seems to be working ok,
except that commands take a long time to run, e.g.

$ time bin/kafka-topics.sh --list --zookeeper localhost:2181

real 0m11.751s
user 0m1.540s
sys 0m0.273s

The zookeeper logging shows that the request is processed in a few
milliseconds, so I think it's related to the kafka JVM configuration. If I
remove com.sun.management.jmxremote it's takes 6 seconds but this is still
much longer than I would have expected.

Any suggestions on how to speed things up?


Configuration for low latency and low cpu utilization? java/librdkafka

2016-12-08 Thread Niklas Ström
Use case scenario:
We want to have a fairly low latency, say below 20 ms, and we want to be
able to run a few hundred processes (on one machine) both producing and
consuming a handful of topics. The throughput is not high, lets say on
average 10 messages per second for each process. Most messages are 50-500
bytes large, some may be a few kbytes.

How should we adjust the configuration parameters for our use case?

Our experiments so far gives us a good latency but at the expence of CPU
utilization. Even with a bad latency, the CPU utilization is not
satisfying. Since we will have a lot of processes we are concerned that
short poll loops will cause an overconsumption of CPU capacity. We are
hoping we might have missed some configuration parameter or that we have
some issues with our environment that we can find and solve.

We are using both the java client and librdkafka and see similar CPU issues
in both clients.

We have looked at recommendations from:
https://github.com/edenhill/librdkafka/wiki/How-to-decrease-message-latency
The only thing that seems to really make a difference for librdkafka is
socket.blocking.max.ms, but reducing that also makes the CPU go up.

I would really appreciate input on configuration parameters and of any
experience with environment issues that has caused CPU load. Or is our
scenario not feasible at all?

Cheers


Re: KafkaStreams metadata - enum keys?

2016-12-08 Thread Damian Guy
Hi Jon,

How are you trying to access the store?

That exception is thrown in a few circumstances:
1. KakfaStreams hasn't initialized or is re-initializing due to a
rebalance. This can occur for a number of reasons, i.e., new
topics/partitions being added to the broker (including streams internal
topics), broker going down, StreamThreads starting or stopping etc
2. The StateStore has just been closed, which would usually mean that 1. is
about to happen
3. The StateStore with that name and type doesn't exist on the local
KakfaStreams instance.

Thanks,
Damian

On Thu, 8 Dec 2016 at 11:57 Jon Yeargers  wrote:

> Tried calling that - got this exception (FWIW - there isn't any other
> instance)
>
> State store value comes from
>
> groupByKey().aggregate(LogLine::new,
> new aggregate(),
> TimeWindows.of(60 * 60 * 1000L),
> collectorSerde, "minute_agg_stream");
>
> 2016-12-08 11:33:50,924 [qtp1318180415-18] DEBUG
> o.eclipse.jetty.server.HttpChannel - Could not send response error 500,
> already committed
>
> javax.servlet.ServletException:
> org.apache.kafka.streams.errors.InvalidStateStoreException: the state
> store, minute_agg_stream, may have migrated to another instance.
>
> at
>
> org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)
>
> at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)
>
> at
>
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
>
> at
>
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
>
> at
>
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
>
> at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)
>
> at
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584)
>
> at
>
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:224)
>
> at
>
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
>
> at
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
>
> at
>
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
>
> at
>
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
>
> at
>
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>
> at
>
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
>
> at org.eclipse.jetty.server.Server.handle(Server.java:534)
>
> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>
> at
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>
> at
> org.eclipse.jetty.io
> .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
>
> at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95)
>
> at
> org.eclipse.jetty.io
> .SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
>
> at
>
> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
>
> at
>
> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
>
> at
>
> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
>
> at
>
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
>
> at
>
> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: the
> state store, minute_agg_stream, may have migrated to another instance.
>
> at
>
> org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:49)
>
> at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378)
>
> at
>
> com.cedexis.prtminuteagg.RestService.rangeForKeyValueStore(RestService.java:190)
>
> at
> com.cedexis.prtminuteagg.RestService.keyRangeForStore(RestService.java:99)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
>
> org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)
>
> at
>
> org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144)
>
> at
>
> org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161)
>
> at
>
> org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:205)
>
> at
>
> org.glassfish.jersey.server.model.internal.Ab

Re: kafka cluster network bandwidth is too high

2016-12-08 Thread Stephen Powis
Yea, we have a 7 node cluster with ~200 topics and see sustained 100Mbps
going between the nodes.  Very bandwidth hungry :p

On Thu, Dec 8, 2016 at 1:51 AM, Matthias J. Sax 
wrote:

> You cannot sent images over the mailing list. They get automatically
> removed.
>
> On 12/6/16 11:55 PM, 陈超 wrote:
> > Hi kafka developer,
> >
> >
> >
> >  I have a kafka cluster with 3 node. And it have 3 topic now. We
> > have not many data into the kafka topic now. But the node sync data to
> > each other node bandwidth is up to 4 Mb/s. I don’t know why so high.
> > This is the picture below:
> >
> >
> >
> > Iftop info:
> >
> > 说明:
> > C:\Users\lenovo\AppData\Roaming\Tencent\Users\
> 247135449\QQ\WinTemp\RichOle\PZZ[Q[7A8_0F%{WM6ZC[K{O.png
> >
> >
> >
> > Ps –mp kafkapid –o THREAD,tid,time
> >
> > 说明:
> > C:\Users\lenovo\AppData\Roaming\Tencent\Users\
> 247135449\QQ\WinTemp\RichOle\5(X37VYS5XVO7@@MF6SMX6T.png
> >
> >
> >
> > Jstack info:
> >
> > 说明:
> > C:\Users\lenovo\AppData\Roaming\Tencent\Users\
> 247135449\QQ\WinTemp\RichOle\SEXO2G62J[L9}GDSO7)BS}B.png
> >
> >
> >
> > Can you help me find the problem?
> >
> > Thank you very much!
> >
>
>


Upgrading from 0.10.0.1 to 0.10.1.0

2016-12-08 Thread Hagen Rother
Hi,

I am testing an upgrade and I am stuck on the mirror maker.

- New consumer doesn't like the old brokers
- Old consumer comes up, but does nothing and throws
a java.net.SocketTimeoutException after while.

What's the correct upgrade strategy when mirroring is used?

Thanks!
Hagen


Re: Kafka and zookeeper stores and mesos env

2016-12-08 Thread Mike Marzo
understood, and i am looking at that bit but i would still like to know the
answer.

On Thu, Dec 8, 2016 at 8:22 AM, Asaf Mesika  wrote:

> Off-question a bit - Using the Kafka Mesos framework should save you from
> handling those questions: https://github.com/mesos/kafka
>
>
> On Thu, Dec 8, 2016 at 2:33 PM Mike Marzo 
> wrote:
>
> If i'm running a 5 node zk cluster and a 3 node kafka cluster in dcker on a
> mesos/marathon environment where my zk and broker nodes are all leveraging
> local disk on the hosts they are running on is there any value to the local
> data being preserved across restarts?
>
> In other words  when a broker node fails and restarts on the same
> machine does it leverage any of the actual data it has on disk from the
> prior life or is it assumed to start from new being re-hydrated from the
> other in sync nodes in the running cluster?Same question for
> zookeeper...
>
> I'm trying to asses the value of local data preservation across marathon re
> scheduled jobs. Im thinking the data is blown away as much would be stale
> but not sure since log retentions could be set quite high and having
> history could make re-sync more efficient by effectively only closing the
> gap for deltas while down.  Anyone know
>


Re: Kafka and zookeeper stores and mesos env

2016-12-08 Thread Asaf Mesika
Off-question a bit - Using the Kafka Mesos framework should save you from
handling those questions: https://github.com/mesos/kafka


On Thu, Dec 8, 2016 at 2:33 PM Mike Marzo 
wrote:

If i'm running a 5 node zk cluster and a 3 node kafka cluster in dcker on a
mesos/marathon environment where my zk and broker nodes are all leveraging
local disk on the hosts they are running on is there any value to the local
data being preserved across restarts?

In other words  when a broker node fails and restarts on the same
machine does it leverage any of the actual data it has on disk from the
prior life or is it assumed to start from new being re-hydrated from the
other in sync nodes in the running cluster?Same question for
zookeeper...

I'm trying to asses the value of local data preservation across marathon re
scheduled jobs. Im thinking the data is blown away as much would be stale
but not sure since log retentions could be set quite high and having
history could make re-sync more efficient by effectively only closing the
gap for deltas while down.  Anyone know


Kafka and zookeeper stores and mesos env

2016-12-08 Thread Mike Marzo
If i'm running a 5 node zk cluster and a 3 node kafka cluster in dcker on a
mesos/marathon environment where my zk and broker nodes are all leveraging
local disk on the hosts they are running on is there any value to the local
data being preserved across restarts?

In other words  when a broker node fails and restarts on the same
machine does it leverage any of the actual data it has on disk from the
prior life or is it assumed to start from new being re-hydrated from the
other in sync nodes in the running cluster?Same question for
zookeeper...

I'm trying to asses the value of local data preservation across marathon re
scheduled jobs. Im thinking the data is blown away as much would be stale
but not sure since log retentions could be set quite high and having
history could make re-sync more efficient by effectively only closing the
gap for deltas while down.  Anyone know


Re: KafkaStreams metadata - enum keys?

2016-12-08 Thread Jon Yeargers
Tried calling that - got this exception (FWIW - there isn't any other
instance)

State store value comes from

groupByKey().aggregate(LogLine::new,
new aggregate(),
TimeWindows.of(60 * 60 * 1000L),
collectorSerde, "minute_agg_stream");

2016-12-08 11:33:50,924 [qtp1318180415-18] DEBUG
o.eclipse.jetty.server.HttpChannel - Could not send response error 500,
already committed

javax.servlet.ServletException:
org.apache.kafka.streams.errors.InvalidStateStoreException: the state
store, minute_agg_stream, may have migrated to another instance.

at
org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)

at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)

at
org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)

at
org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)

at
org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)

at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)

at
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584)

at
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:224)

at
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)

at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)

at
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)

at
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)

at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)

at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)

at org.eclipse.jetty.server.Server.handle(Server.java:534)

at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)

at
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)

at
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)

at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95)

at
org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)

at
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)

at
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)

at
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)

at
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)

at
org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: the
state store, minute_agg_stream, may have migrated to another instance.

at
org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:49)

at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378)

at
com.cedexis.prtminuteagg.RestService.rangeForKeyValueStore(RestService.java:190)

at
com.cedexis.prtminuteagg.RestService.keyRangeForStore(RestService.java:99)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)

at
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144)

at
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161)

at
org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:205)

at
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)

at
org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)

at
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)

at
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)

at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:326)

at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)

at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)

at org.glassfish.jersey.internal.Errors.process(Errors.java:315)

at org.glassfish.jersey.internal.Errors.process(Errors.java:297)

at org.glassfish.jersey.internal.Errors.process(Errors.java:267)

at
org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)

at 

Re: KafkaStreams metadata - enum keys?

2016-12-08 Thread Jon Yeargers
Maybe the 'rangeForKeyValueStore' function from the sample?

On Thu, Dec 8, 2016 at 2:55 AM, Jon Yeargers 
wrote:

> I see functions that require knowing a key name but in the interests of
> partitioning we're using fairly complex key structures (IE non-obvious to
> an external function).
>
> Is there a method / process for enumerating keys?
>


KafkaStreams metadata - enum keys?

2016-12-08 Thread Jon Yeargers
I see functions that require knowing a key name but in the interests of
partitioning we're using fairly complex key structures (IE non-obvious to
an external function).

Is there a method / process for enumerating keys?


Re: Consumer poll - no results

2016-12-08 Thread Harald Kirsch
auto.offset.reset is honoured if the consumer group has not committed 
offsets yet, or if the offsets expired (I think this is 
offsets.retention.*).


Otherwise the last committed offsets should be read for that group.

Harald.

On 07.12.2016 18:48, Mohit Anchlia wrote:

Is auto.offset.reset honored just the first time consumer starts and
polling? In other words everytime consumer starts does it start from the
beginning even if it has already read those messages?

On Wed, Dec 7, 2016 at 1:43 AM, Harald Kirsch 
wrote:


Have you defined

auto.offset.reset: earliest

or otherwise made sure (KafkaConsumer.position()) that the consumer does
not just wait for *new* messages to arrive?

Harald.



On 06.12.2016 20:11, Mohit Anchlia wrote:


I see this message in the logs:

[2016-12-06 13:54:16,586] INFO [GroupCoordinator 0]: Preparing to
restabilize group DemoConsumer with old generation 3
(kafka.coordinator.GroupCoordinator)



On Tue, Dec 6, 2016 at 10:53 AM, Mohit Anchlia 
wrote:

I have a consumer polling a topic of Kafka 0.10. Even though the topic has

messages the consumer poll is not fetching the message. The thread dump
reveals:

"main" #1 prio=5 os_prio=0 tid=0x7f3ba4008800 nid=0x798 runnable
[0x7f3baa6c3000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.
java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0006c6d1f8b8> (a sun.nio.ch.Util$3)
- locked <0x0006c6d1f8a8> (a java.util.Collections$
UnmodifiableSet)
- locked <0x0006c6d1f0b8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(
Selector.java:470)
at org.apache.kafka.common.network.Selector.poll(
Selector.java:286)
at org.apache.kafka.clients.NetworkClient.poll(
NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.
ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
- locked <0x0006c6d1eff8> (a org.apache.kafka.clients.
consumer.internals.ConsumerNetworkClient)
at org.apache.kafka.clients.consumer.KafkaConsumer.
pollOnce(KafkaConsumer.java:1031)








Re: Problem with multiple joins in one topology

2016-12-08 Thread Matthias J. Sax
Hi Brian,

Sorry for you headache. We are aware that current join semantics in
Streams are not straight forward.

We did rework those already in trunk and this change will be included in
next release 0.10.2. Please build from trunk and let us know if this
resolves your issue.

For details, see this wiki page explaining current and new join semantics:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics

For more details see the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-77%3A+Improve+Kafka+Streams+Join+Semantics


Long story short: leftJoin only triggers a join computation for the left
input while records of the right input only update the (right) input
KTable but do not compute a join result. Thus, if your right input data
arrives before you left input data it works -- however, if you left
input data arrives first, it will not enrich the stream but join with
"null".


-Matthias

On 12/7/16 9:25 AM, Brian Krahmer wrote:
> Hey guys,
> 
>   I'm having a hell of a time here.  I've worked for days trying to get
> this joining pipeline working.  I thought I had it working last week,
> but my jubilation was premature.  The point was to take data in from
> five different topics and merge them together to obtain one enriched
> event (output to compacted topic).  Can anybody spot what I'm doing
> wrong?  The ordering makes no difference.  For example, I've switched
> the locationInput and the vehicleReservedInput inputs in the leftJoin
> calls below, and I get the same results.  The location part of the
> enrichment works while the vehicleReserved part does not.  I can't even
> think of how to restructure the topology without resorting to building
> my own lower-level topology.
> 
> thanks,
> brian
> 
> 
> KTable fleetInput =
> builder.table(Serdes.String(),
> vehicleFinderDataSerde, FLEET_TOPIC,
> VEHICLE_ENRICHER_FLEET_STORE);
> ...
> fleetInput.print("fleetInput");
> locationInput.print("locationInput");
> vehicleReservedInput.print("vehicleReservedInput");
> vehicleReleasedInput.print("vehicleReleasedInput");
> vehicleUsageEndedInput.print("vehicleUsageEndedInput");
> 
> KTable mergeStepOne =
> fleetInput.leftJoin(locationInput, VehicleFinderData::merge);
> mergeStepOne.print("mergeStepOne");
> KTable mergeStepTwo =
> mergeStepOne.leftJoin(vehicleReleasedInput, VehicleFinderData::merge);
> mergeStepTwo.print("mergeStepTwo");
> KTable mergeStepThree =
> mergeStepTwo.leftJoin(vehicleUsageEndedInput, VehicleFinderData::merge);
> mergeStepThree.print("mergeStepThree");
> KTable mergeStepFour =
> mergeStepThree.leftJoin(vehicleReservedInput, VehicleFinderData::merge);
> mergeStepFour.print("mergeStepFour");
> 
> ** Generate a location event **
> 
> [locationInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
> value}<-null)
> Deserializing from topic VehicleEnricherFleetStore
> Merge operation called
> [mergeStepOne]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
> value}<-null)
> Merge operation called
> [mergeStepTwo]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
> value}<-null)
> Merge operation called
> [mergeStepThree]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
> value}<-null)
> Merge operation called
> [mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
> value}<-null)
> 
> ** New event correctly serialized **
> 
> ---
> 
> ** Generate a vehicleReserved event **
> 
> [vehicleReservedInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped
> json value}<-null)
> [mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , (null<-null)
> 
> ** NO EVENT **
> 
> 



signature.asc
Description: OpenPGP digital signature