Re: Running cluster of stream processing application

2016-12-09 Thread Damian Guy
Hi Sachin,

What you have suggested will never happen. If there is only 1 partition
there will only ever be one consumer of that partition. So if you had 2
instances of your streams application, and only a single input partition,
only 1 instance would be processing the data.
If you are running like this, then you might want to set
StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG to 1 - this will mean that the
State Store that is generated by the aggregation is kept up to date on the
instance that is not processing the data. So in the event that the active
instance fails, the standby instance should be able to continue without too
much of a gap in processing time.

Thanks,
Damian

On Fri, 9 Dec 2016 at 04:55 Sachin Mittal  wrote:

> Hi,
> I followed the document and I have few questions.
> Say I have a single partition input key topic and say I run 2 streams
> application from machine1 and machine2.
> Both the application have same application id are have identical code.
> Say topic1 has messages like
> (k1, v11)
> (k1, v12)
> (k1, v13)
> (k2, v21)
> (k2, v22)
> (k2, v23)
> When I was running single application I was getting results like
> (k1, agg(v11, v12, v13))
> (k2, agg(v21, v22, v23))
>
> Now when 2 applications are run and say messages are read in round robin
> fashion.
> v11 v13 v22 - machine 1
> v12 v21 v23 - machine 2
>
> The aggregation at machine 1 would be
> (k1, agg(v11, v13))
> (k2, agg(v22))
>
> The aggregation at machine 2 would be
> (k1, agg(v12))
> (k2, agg(v21, v23))
>
> So now where do I join the independent results of these 2 aggregation to
> get the final result as expected when single instance was running.
>
> Note my high level dsl is sometime like
> srcSTopic.aggragate(...).foreach(key, aggregation) {
> //process aggragated value and push it to some external storage
> }
>
> So I want this each to be running against the final set of aggregated
> value. Do I need to add another step before foreach to make sure the
> different results from 2 machines are joined to get the final one as
> expected. If yes what does that step 2.
>
> Thanks
> Sachin
>
>
>
>
>
>
> On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> > Hi Sachin,
> >
> > Some quick answers, and a link to some documentation to read more:
> >
> > - If you restart the application, it will start from the point it crashed
> > (possibly reprocessing a small window of records).
> >
> > - You can run more than one instance of the application.  They'll
> > coordinate by virtue of being part of a Kafka consumer group; if one
> > crashes, the partitions that it was reading from will be picked up by
> other
> > instances.
> >
> > - When running more than one instance, the tasks will be distributed
> > between the instances.
> >
> > Confluent's docs on the Kafka Streams architecture goes into a lot more
> > detail: http://docs.confluent.io/3.0.0/streams/architecture.html
> >
> >
> >
> >
> > On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal 
> wrote:
> >
> > > Hi All,
> > > We were able to run a stream processing application against a fairly
> > decent
> > > load of messages in production environment.
> > >
> > > To make the system robust say the stream processing application
> crashes,
> > is
> > > there a way to make it auto start from the point when it crashed?
> > >
> > > Also is there any concept like running the same application in a
> cluster,
> > > where one fails, other takes over, until we bring back up the failed
> node
> > > of streams application.
> > >
> > > If yes, is there any guidelines or some knowledge base we can look at
> to
> > > understand how this would work.
> > >
> > > Is there way like in spark, where the driver program distributes the
> > tasks
> > > across various nodes in a cluster, is there something similar in kafka
> > > streaming too.
> > >
> > > Thanks
> > > Sachin
> > >
> >
>


Re: kafka streams passes org.apache.kafka.streams.kstream.internals.Change to my app!!

2016-12-09 Thread Damian Guy
Hi Ara,

It is a bug in 0.10.1 that has been fixed:
https://issues.apache.org/jira/browse/KAFKA-4311
To work around it you should set
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG to 0
The fix is available on trunk and 0.10.1 branch and there will be a
0.10.1.1 release any day now.

Thanks,
Damian

On Fri, 9 Dec 2016 at 01:12 Ara Ebrahimi 
wrote:

> Hi,
>
> Once in a while and quite randomly this happens, but it does happen every
> few hundred thousand message:
>
> 2016-12-03 11:48:05 ERROR StreamThread:249 - stream-thread
> [StreamThread-4] Streams application error during processing:
> java.lang.ClassCastException:
> org.apache.kafka.streams.kstream.internals.Change cannot be cast to
> com.argyledata.streams.entity.Activity
> at com.argyledata.streams.StreamPipeline$$Lambda$14/33419717.apply(Unknown
> Source)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:199)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
> at
> org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateValueGetter.get(KStreamAggregate.java:112)
> at
> org.apache.kafka.streams.kstream.internals.KStreamKTableLeftJoin$KStreamKTableLeftJoinProcessor.process(KStreamKTableLeftJoin.java:61)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
>
> Has anyone else seen this weird problem?
>
> Ara.
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>


Re: controlling memory growth when aggregating

2016-12-09 Thread Damian Guy
Hi Jon,

Are you using 0.10.1? There is a resource leak to do with the Window
Iterator. The bug has been fixed on the 0.10.1 branch (which will soon be
released as 0.10.1.1)
and it is also fixed in the confluent fork.

You can get the confluent version by using the following:



confluent
http://packages.confluent.io/maven/



org.apache.kafka
kafka-streams
0.10.1.0-cp2
org.apache.kafka
kafka-clients
0.10.1.0-cp2


On Thu, 8 Dec 2016 at 23:37 Jon Yeargers  wrote:

I working with JSON data that has an array member. Im aggregating values
into this using minute long windows.

I ran the app for ~10 minutes and watched it consume 40% of the memory on a
box with 32G. It was still growing when I stopped it. At this point it had
created ~800 values each of which was < 1Mb in size (owing to the
limitations on message size set at the broker). (I wrote all the values
into Redis so I could count them and check the aggregation).

1. Why is it consuming so much memory?
2. Is there a strategy for controlling this growth?

I get that it's keeping every window open in case a new value shows up.
Maybe some way to relax this using event time vs clock time?


Deleting a topic without delete.topic.enable=true?

2016-12-09 Thread Tim Visher
Hi Everyone,

I'm really confused at the moment. We created a topic with brokers set to
delete.topic.enable=false.

We now need to delete that topic. To do that we shut down all the brokers,
deleted everything under log.dirs and logs.dir on all the kafka brokers,
`rmr`ed the entire chroot that kafka was storing things under in zookeeper,
and then brought kafka back up.

After doing all that, the topic comes back, every time.

What can we do to delete that topic?

--

In Christ,

Timmy V.

http://blog.twonegatives.com/
http://five.sentenc.es/ -- Spend less time on mail


Error starting kafka server.

2016-12-09 Thread Sai Karthi Golagani
Hi team,
I've recently setup kafka server on a edge node and zk on 3 separate hosts.I'm 
using zookeeper as a client @port no: 5181.I have 3 hosts having zk, so I'm 
using a znode to connect to zk from my kafka host.
While starting the kafka server, I'm getting this following exception "FATAL 
[Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown 
(kafka.server.KafkaServer)"
Im attaching server,zookeeper .properties files along with the server.log.
Please do the needful.

Kafka version: 0.9.0
Zk  version : 3.4.5


Re: Deleting a topic without delete.topic.enable=true?

2016-12-09 Thread Rodrigo Sandoval
Why did you do all those things instead of just setting
delete.topic.enable=true?

On Dec 9, 2016 13:40, "Tim Visher"  wrote:

> Hi Everyone,
>
> I'm really confused at the moment. We created a topic with brokers set to
> delete.topic.enable=false.
>
> We now need to delete that topic. To do that we shut down all the brokers,
> deleted everything under log.dirs and logs.dir on all the kafka brokers,
> `rmr`ed the entire chroot that kafka was storing things under in zookeeper,
> and then brought kafka back up.
>
> After doing all that, the topic comes back, every time.
>
> What can we do to delete that topic?
>
> --
>
> In Christ,
>
> Timmy V.
>
> http://blog.twonegatives.com/
> http://five.sentenc.es/ -- Spend less time on mail
>


Re: Deleting a topic without delete.topic.enable=true?

2016-12-09 Thread Ali Akhtar
You need to also delete / restart zookeeper, its probably storing the
topics there. (Or yeah, just enable it and then delete the topic)

On Fri, Dec 9, 2016 at 9:09 PM, Rodrigo Sandoval  wrote:

> Why did you do all those things instead of just setting
> delete.topic.enable=true?
>
> On Dec 9, 2016 13:40, "Tim Visher"  wrote:
>
> > Hi Everyone,
> >
> > I'm really confused at the moment. We created a topic with brokers set to
> > delete.topic.enable=false.
> >
> > We now need to delete that topic. To do that we shut down all the
> brokers,
> > deleted everything under log.dirs and logs.dir on all the kafka brokers,
> > `rmr`ed the entire chroot that kafka was storing things under in
> zookeeper,
> > and then brought kafka back up.
> >
> > After doing all that, the topic comes back, every time.
> >
> > What can we do to delete that topic?
> >
> > --
> >
> > In Christ,
> >
> > Timmy V.
> >
> > http://blog.twonegatives.com/
> > http://five.sentenc.es/ -- Spend less time on mail
> >
>


internals.AbstractCoordinator {} - Marking the coordinator ip-xyz:9092 (id: 2144 rack: null) dead for group MyTopic

2016-12-09 Thread shahab
Hello,

I am using Kafka 1.0.1 and in Kafka Consumer side, I am regularly (every
10-20 min) seeing the following Kafka log message which causes the
partition assignment to consumer ,...

Consumer side logs: Marking the coordinator ip-XYZ:9092 (id: 2147482644
rack: null) dead for group MyGroup

Does anyone know what is the source of this issue?

I played with CONNECTIONS_MAX_IDLE_MS_CONFIG  in Consumer side kafka
configuration and it didn't effect the results.

best,
Shahab

Here is the related logs I found in consumer side:
2016-12-08 20:41:12.559 INFO  internals.AbstractCoordinator {} - Marking
the coordinator ip-XYZ:9092 (id: 2147482644 rack: null) dead for group
MyGroup
2016-12-08 20:41:12.560 WARN  internals.ConsumerCoordinator {} - Auto
offset commit failed for group  MyGroup: Commit offsets failed with
retriable exception. You should retry committing offsets.
2016-12-08 20:41:12.916 INFO  internals.AbstractCoordinator {} - Discovered
coordinator ip-xyz:9092 (id: 2147482644 rack: null) for group MyGroup.
2016-12-08 22:59:28.310 INFO  internals.ConsumerCoordinator {} - Revoking
previously assigned partitions [topic-1,topic-2,topic-3,topic-4] for group
MyGroup
2016-12-08 22:59:28.310 INFO  internals.AbstractCoordinator {} -
(Re-)joining group MyGroup
2016-12-08 22:59:28.343 INFO  internals.AbstractCoordinator {} -
Successfully joined group MyGroup with generation 86
2016-12-08 22:59:28.343 INFO  internals.ConsumerCoordinator {} - Setting
newly assigned partitions [opic-1,topic-2,topic-3,topic-4] for group MyGroup


Re: Deleting a topic without delete.topic.enable=true?

2016-12-09 Thread Tim Visher
I did all of that because setting delete.topic.enable=true wasn't
effective. We set that across every broker, restarted them, and then
deleted the topic, and it was still stuck in existence.

On Fri, Dec 9, 2016 at 11:11 AM, Ali Akhtar  wrote:

> You need to also delete / restart zookeeper, its probably storing the
> topics there. (Or yeah, just enable it and then delete the topic)
>
> On Fri, Dec 9, 2016 at 9:09 PM, Rodrigo Sandoval <
> rodrigo.madfe...@gmail.com
> > wrote:
>
> > Why did you do all those things instead of just setting
> > delete.topic.enable=true?
> >
> > On Dec 9, 2016 13:40, "Tim Visher"  wrote:
> >
> > > Hi Everyone,
> > >
> > > I'm really confused at the moment. We created a topic with brokers set
> to
> > > delete.topic.enable=false.
> > >
> > > We now need to delete that topic. To do that we shut down all the
> > brokers,
> > > deleted everything under log.dirs and logs.dir on all the kafka
> brokers,
> > > `rmr`ed the entire chroot that kafka was storing things under in
> > zookeeper,
> > > and then brought kafka back up.
> > >
> > > After doing all that, the topic comes back, every time.
> > >
> > > What can we do to delete that topic?
> > >
> > > --
> > >
> > > In Christ,
> > >
> > > Timmy V.
> > >
> > > http://blog.twonegatives.com/
> > > http://five.sentenc.es/ -- Spend less time on mail
> > >
> >
>


Re: Deleting a topic without delete.topic.enable=true?

2016-12-09 Thread Todd Palino
Given that you said you removed the log directories, and provided that when
you did the rmr on Zookeeper it was to the “/brokers/topics/(topic name)”
path, you did the right things for a manual deletion. It sounds like you
may have a consumer (or other client) that is recreating the topic. Do you
have auto topic creation enabled?

-Todd


On Fri, Dec 9, 2016 at 8:25 AM, Tim Visher  wrote:

> I did all of that because setting delete.topic.enable=true wasn't
> effective. We set that across every broker, restarted them, and then
> deleted the topic, and it was still stuck in existence.
>
> On Fri, Dec 9, 2016 at 11:11 AM, Ali Akhtar  wrote:
>
> > You need to also delete / restart zookeeper, its probably storing the
> > topics there. (Or yeah, just enable it and then delete the topic)
> >
> > On Fri, Dec 9, 2016 at 9:09 PM, Rodrigo Sandoval <
> > rodrigo.madfe...@gmail.com
> > > wrote:
> >
> > > Why did you do all those things instead of just setting
> > > delete.topic.enable=true?
> > >
> > > On Dec 9, 2016 13:40, "Tim Visher"  wrote:
> > >
> > > > Hi Everyone,
> > > >
> > > > I'm really confused at the moment. We created a topic with brokers
> set
> > to
> > > > delete.topic.enable=false.
> > > >
> > > > We now need to delete that topic. To do that we shut down all the
> > > brokers,
> > > > deleted everything under log.dirs and logs.dir on all the kafka
> > brokers,
> > > > `rmr`ed the entire chroot that kafka was storing things under in
> > > zookeeper,
> > > > and then brought kafka back up.
> > > >
> > > > After doing all that, the topic comes back, every time.
> > > >
> > > > What can we do to delete that topic?
> > > >
> > > > --
> > > >
> > > > In Christ,
> > > >
> > > > Timmy V.
> > > >
> > > > http://blog.twonegatives.com/
> > > > http://five.sentenc.es/ -- Spend less time on mail
> > > >
> > >
> >
>



-- 
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Deleting a topic without delete.topic.enable=true?

2016-12-09 Thread Tim Visher
On Fri, Dec 9, 2016 at 11:34 AM, Todd Palino  wrote:

> Given that you said you removed the log directories, and provided that when
> you did the rmr on Zookeeper it was to the “/brokers/topics/(topic name)”
> path, you did the right things for a manual deletion. It sounds like you
> may have a consumer (or other client) that is recreating the topic. Do you
> have auto topic creation enabled?
>

That was the last epiphany we had. We had shut down the producer but not
all the consumers and we do allow auto-topic creation.

That said, we then proceeded to shut all of them down (the consumers) and
the topic came back. I'm glad that we were doing the right steps though.

>
> -Todd
>
>
> On Fri, Dec 9, 2016 at 8:25 AM, Tim Visher  wrote:
>
> > I did all of that because setting delete.topic.enable=true wasn't
> > effective. We set that across every broker, restarted them, and then
> > deleted the topic, and it was still stuck in existence.
> >
> > On Fri, Dec 9, 2016 at 11:11 AM, Ali Akhtar 
> wrote:
> >
> > > You need to also delete / restart zookeeper, its probably storing the
> > > topics there. (Or yeah, just enable it and then delete the topic)
> > >
> > > On Fri, Dec 9, 2016 at 9:09 PM, Rodrigo Sandoval <
> > > rodrigo.madfe...@gmail.com
> > > > wrote:
> > >
> > > > Why did you do all those things instead of just setting
> > > > delete.topic.enable=true?
> > > >
> > > > On Dec 9, 2016 13:40, "Tim Visher"  wrote:
> > > >
> > > > > Hi Everyone,
> > > > >
> > > > > I'm really confused at the moment. We created a topic with brokers
> > set
> > > to
> > > > > delete.topic.enable=false.
> > > > >
> > > > > We now need to delete that topic. To do that we shut down all the
> > > > brokers,
> > > > > deleted everything under log.dirs and logs.dir on all the kafka
> > > brokers,
> > > > > `rmr`ed the entire chroot that kafka was storing things under in
> > > > zookeeper,
> > > > > and then brought kafka back up.
> > > > >
> > > > > After doing all that, the topic comes back, every time.
> > > > >
> > > > > What can we do to delete that topic?
> > > > >
> > > > > --
> > > > >
> > > > > In Christ,
> > > > >
> > > > > Timmy V.
> > > > >
> > > > > http://blog.twonegatives.com/
> > > > > http://five.sentenc.es/ -- Spend less time on mail
> > > > >
> > > >
> > >
> >
>
>
>
> --
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino
>


Re: Deleting a topic without delete.topic.enable=true?

2016-12-09 Thread Vahid S Hashemian
Any chance the consumer process that consumes from that topic is still 
running while you are doing all this?

--Vahid



From:   Tim Visher 
To: users@kafka.apache.org
Date:   12/09/2016 08:26 AM
Subject:Re: Deleting a topic without delete.topic.enable=true?



I did all of that because setting delete.topic.enable=true wasn't
effective. We set that across every broker, restarted them, and then
deleted the topic, and it was still stuck in existence.

On Fri, Dec 9, 2016 at 11:11 AM, Ali Akhtar  wrote:

> You need to also delete / restart zookeeper, its probably storing the
> topics there. (Or yeah, just enable it and then delete the topic)
>
> On Fri, Dec 9, 2016 at 9:09 PM, Rodrigo Sandoval <
> rodrigo.madfe...@gmail.com
> > wrote:
>
> > Why did you do all those things instead of just setting
> > delete.topic.enable=true?
> >
> > On Dec 9, 2016 13:40, "Tim Visher"  wrote:
> >
> > > Hi Everyone,
> > >
> > > I'm really confused at the moment. We created a topic with brokers 
set
> to
> > > delete.topic.enable=false.
> > >
> > > We now need to delete that topic. To do that we shut down all the
> > brokers,
> > > deleted everything under log.dirs and logs.dir on all the kafka
> brokers,
> > > `rmr`ed the entire chroot that kafka was storing things under in
> > zookeeper,
> > > and then brought kafka back up.
> > >
> > > After doing all that, the topic comes back, every time.
> > >
> > > What can we do to delete that topic?
> > >
> > > --
> > >
> > > In Christ,
> > >
> > > Timmy V.
> > >
> > > http://blog.twonegatives.com/
> > > http://five.sentenc.es/ -- Spend less time on mail
> > >
> >
>






Re: log.retention.hours not working?

2016-12-09 Thread Matthias J. Sax
Cross posting from SO. The exact some question. See my answer there:

http://stackoverflow.com/questions/41048041/kafka-deletes-segments-even-before-segment-size-is-reached/41065100#comment69338104_41065100


-Matthias

On 12/8/16 8:43 PM, Rodrigo Sandoval wrote:
> This is what Tood said:
> 
> "Retention is going to be based on a combination of both the retention and
> segment size settings (as a side note, it's recommended to use
> log.retention.ms and log.segment.ms, not the hours config. That's there for
> legacy reasons, but the ms configs are more consistent). As messages are
> received by Kafka, they are written to the current open log segment for
> each partition. That segment is rotated when either the log.segment.bytes
> or the log.segment.ms limit is reached. Once that happens, the log segment
> is closed and a new one is opened. Only after a log segment is closed can
> it be deleted via the retention settings. Once the log segment is closed
> AND either all the messages in the segment are older than log.retention.ms
> OR the total partition size is greater than log.retention.bytes, then the
> log segment is purged.
> 
> As a note, the default segment limit is 1 gibibyte. So if you've only
> written in 1k of messages, you have a long way to go before that segment
> gets rotated. This is why the retention is referred to as a minimum time.
> You can easily retain much more than you're expecting for slow topics."
> 
> On Dec 9, 2016 02:38, "Rodrigo Sandoval"  wrote:
> 
>> Your understanding about segment.bytes and retention.ms is correct. But
>> Tood Palino said just after having reached the segment size, that is when
>> the segment is "closed"  PLUS all messages within the segment that was
>> closed are older than the retention policy defined ( in this case
>> retention.ms) THEN delete the segment.
>>
>> At least according to my testing, it is not necessary to wait until the
>> segment is closed to delete it. Simply if all messages in a segment ( no
>> matter if the segment reached the size defined by segment.bytes) are older
>> than the policy defined by retention.ms , THEN delete the segment.
>>
>> I have been playing a lot today with kafka, and at least that is what I
>> figured out.
>>
>> On Dec 9, 2016 02:13, "Sachin Mittal"  wrote:
>>
>>> I think segment.bytes defines the size of single log file before creating
>>> a
>>> new one.
>>> retention.ms defines number of ms to wait on a log file before deleting
>>> it.
>>>
>>> So it is working as defined in docs.
>>>
>>>
>>> On Fri, Dec 9, 2016 at 2:42 AM, Rodrigo Sandoval <
>>> rodrigo.madfe...@gmail.com
 wrote:
>>>
 How is that about that when the segment size is reached, plus every
>>> single
 message inside the segment is older than the retention time, then the
 segment will be deleted?


 I have playing with Kafka and I have the following:

 bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
 config retention.ms=6

 bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
 —config file.delete.delay.ms=4

 bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1
 --config segment.bytes=40

 My understanding according to your thoughts is a segment will be deleted
 when the segment reaches out the segment size above defined
 (segment.bytes=40) PLUS every single message within the segment is
 older than the retention time above defined (retention.ms=6).

 What I noticed is a segment of just 35 bytes, which conteined just one
 message, was deleted after the minute (maybe a little more). Therefore,
>>> the
 segment size was not met in order to delete it.

>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Running cluster of stream processing application

2016-12-09 Thread Matthias J. Sax
About failure and restart. Kafka Streams does not provide any tooling
for this. It's a library.

However, because it is a library it is also agnostic to whatever tool
you want to use. You can for example you a resource manager like Mesos
or YARN, or you containerize your application, or you use tools like
Chef. And there is a bunch more -- pick whatever fits your needs best.

-Matthias


On 12/9/16 12:04 AM, Damian Guy wrote:
> Hi Sachin,
> 
> What you have suggested will never happen. If there is only 1 partition
> there will only ever be one consumer of that partition. So if you had 2
> instances of your streams application, and only a single input partition,
> only 1 instance would be processing the data.
> If you are running like this, then you might want to set
> StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG to 1 - this will mean that the
> State Store that is generated by the aggregation is kept up to date on the
> instance that is not processing the data. So in the event that the active
> instance fails, the standby instance should be able to continue without too
> much of a gap in processing time.
> 
> Thanks,
> Damian
> 
> On Fri, 9 Dec 2016 at 04:55 Sachin Mittal  wrote:
> 
>> Hi,
>> I followed the document and I have few questions.
>> Say I have a single partition input key topic and say I run 2 streams
>> application from machine1 and machine2.
>> Both the application have same application id are have identical code.
>> Say topic1 has messages like
>> (k1, v11)
>> (k1, v12)
>> (k1, v13)
>> (k2, v21)
>> (k2, v22)
>> (k2, v23)
>> When I was running single application I was getting results like
>> (k1, agg(v11, v12, v13))
>> (k2, agg(v21, v22, v23))
>>
>> Now when 2 applications are run and say messages are read in round robin
>> fashion.
>> v11 v13 v22 - machine 1
>> v12 v21 v23 - machine 2
>>
>> The aggregation at machine 1 would be
>> (k1, agg(v11, v13))
>> (k2, agg(v22))
>>
>> The aggregation at machine 2 would be
>> (k1, agg(v12))
>> (k2, agg(v21, v23))
>>
>> So now where do I join the independent results of these 2 aggregation to
>> get the final result as expected when single instance was running.
>>
>> Note my high level dsl is sometime like
>> srcSTopic.aggragate(...).foreach(key, aggregation) {
>> //process aggragated value and push it to some external storage
>> }
>>
>> So I want this each to be running against the final set of aggregated
>> value. Do I need to add another step before foreach to make sure the
>> different results from 2 machines are joined to get the final one as
>> expected. If yes what does that step 2.
>>
>> Thanks
>> Sachin
>>
>>
>>
>>
>>
>>
>> On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak <
>> mathieu.fenn...@replicon.com> wrote:
>>
>>> Hi Sachin,
>>>
>>> Some quick answers, and a link to some documentation to read more:
>>>
>>> - If you restart the application, it will start from the point it crashed
>>> (possibly reprocessing a small window of records).
>>>
>>> - You can run more than one instance of the application.  They'll
>>> coordinate by virtue of being part of a Kafka consumer group; if one
>>> crashes, the partitions that it was reading from will be picked up by
>> other
>>> instances.
>>>
>>> - When running more than one instance, the tasks will be distributed
>>> between the instances.
>>>
>>> Confluent's docs on the Kafka Streams architecture goes into a lot more
>>> detail: http://docs.confluent.io/3.0.0/streams/architecture.html
>>>
>>>
>>>
>>>
>>> On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal 
>> wrote:
>>>
 Hi All,
 We were able to run a stream processing application against a fairly
>>> decent
 load of messages in production environment.

 To make the system robust say the stream processing application
>> crashes,
>>> is
 there a way to make it auto start from the point when it crashed?

 Also is there any concept like running the same application in a
>> cluster,
 where one fails, other takes over, until we bring back up the failed
>> node
 of streams application.

 If yes, is there any guidelines or some knowledge base we can look at
>> to
 understand how this would work.

 Is there way like in spark, where the driver program distributes the
>>> tasks
 across various nodes in a cluster, is there something similar in kafka
 streaming too.

 Thanks
 Sachin

>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Kafka windowed table not aggregating correctly

2016-12-09 Thread Guozhang Wang
Assuming your windows retention period is the same to the window length,
then it is true that ZZ will cause the current window to be dropper. And
then when ZZA is recieved, it will not cause the old windows to be
re-created but will be ignored since it is considered as "expired".

Note that you can set the window retention period much longer than the
window length itself, using the "until" API I mentioned above to handle any
sudden future records.



Guozhang

On Thu, Dec 8, 2016 at 8:19 PM, Sachin Mittal  wrote:

> Hi,
> Right now in order to circumvent this problem I am using a timestamp whose
> values increase by few ms as and when I get new records.
> So lets say I have records in order
> A -> lower limit TS + 1 sec
> B -> lower limit TS + 3 sec
> C -> lower limit TS + 5 sec
> ..
> Z -> upper limit TS - 1 sec
>
> Now say I get a record ZZ with ts upper limit TS + 1 sec I assume it will
> drop the previous windows and create new ones based on this timestamp.
> Please confirm this understanding.
>
> Now lets say I get new record ZZA with timestamp (old) upper limit TS - 1
> sec, will this again cause new windows to be dropped and recreate older
> windows fresh with all the older aggregation done so far lost?
>
> Thanks
> Sachin
>
>
>
>
> On Fri, Dec 9, 2016 at 12:16 AM, Guozhang Wang  wrote:
>
> > Hello Sachin,
> >
> > I am with you that ideally the windowing segmentation implementation
> should
> > be totally abstracted from users but today it is a bit confusing to
> > understand. I have filed JIRA some time ago to improve on this end:
> >
> > https://issues.apache.org/jira/browse/KAFKA-3596
> >
> > So to your example, if a "far future record" was received whose timestamp
> > is beyond current time + the retention period, it could potentially cause
> > the current window to be dropped.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal 
> wrote:
> >
> > > Hi,
> > > I think now it makes all the sense. The field I was using for timestamp
> > > extractor contains timestamps which spans for greater than a day's
> > duration
> > > and it worked for wall clock because for short duration timestamps were
> > in
> > > day's range.
> > >
> > > I wanted to understand one thing:
> > > Say I have a timestamp extractor field and as record gets ingested
> future
> > > records will have increasing values for the timestamp.
> > >
> > > Now lets say default duration is one day. At a future time a record
> will
> > > have timestamp which now is greater than the initial day's range.
> > > What will happen then, it will create a new segment and then create
> > windows
> > > in it for the next day's duration?
> > > What happens if now it gets a record from the previous day, will it get
> > > discarded or will it again have just the single value aggregated in it
> > > (previous values are lost).
> > > So when new segment is create as I understand does it retain the older
> > > segments data.
> > >
> > > This is bit confusing, so would be helpful if you can explain in bit
> more
> > > detail.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang 
> > wrote:
> > >
> > > > Sachin,
> > > >
> > > > One thing to note is that the retention of the windowed stores works
> by
> > > > keeping multiple segments of the stores where each segments stores a
> > time
> > > > range which can potentially span multiple windows, if a new window
> > needs
> > > to
> > > > be created that is further from the oldest segment's time range +
> > > retention
> > > > period (from your code it seems you do not override it from
> > > > TimeWindows.of("stream-table",
> > > > 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of
> > one
> > > > day is used.
> > > >
> > > > So with WallclockTimeExtractor since it is using system time, it wont
> > > give
> > > > you timestamps that span for more than a day during a short period of
> > > time,
> > > > but if your own defined timestamps expand that value, then old
> segments
> > > > will be dropped immediately and hence the aggregate values will be
> > > returned
> > > > as a single value.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <
> > matth...@confluent.io>
> > > > wrote:
> > > >
> > > > > The extractor is used in
> > > > >
> > > > > org.apache.kafka.streams.processor.internals.
> > > RecordQueue#addRawRecords()
> > > > >
> > > > > Let us know, if you could resolve the problem or need more help.
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > > > > > https://github.com/SOHU-Co/kafka-node/ this is the node js
> client
> > i
> > > am
> > > > > > using. The version is 0.5x. Can you please tell me what code in
> > > streams
> > > > > > calls the timestamp extractor. I can look there to see if there
> is
> > > any
> > > > > > issue.
> > > > > >
> > > > > > Again issue happens only when producing the messages using
> producer
> >

Re: controlling memory growth when aggregating

2016-12-09 Thread Jon Yeargers
Perhaps that's the problem. Yes - I'm still using 0.10.1.0.

Does this involve a broker update?

On Fri, Dec 9, 2016 at 7:12 AM, Damian Guy  wrote:

> Hi Jon,
>
> Are you using 0.10.1? There is a resource leak to do with the Window
> Iterator. The bug has been fixed on the 0.10.1 branch (which will soon be
> released as 0.10.1.1)
> and it is also fixed in the confluent fork.
>
> You can get the confluent version by using the following:
>
> 
> 
> confluent
> http://packages.confluent.io/maven/
> 
>
> 
> org.apache.kafka
> kafka-streams
> 0.10.1.0-cp2
> org.apache.kafka
> kafka-clients
> 0.10.1.0-cp2
>
>
> On Thu, 8 Dec 2016 at 23:37 Jon Yeargers  wrote:
>
> I working with JSON data that has an array member. Im aggregating values
> into this using minute long windows.
>
> I ran the app for ~10 minutes and watched it consume 40% of the memory on a
> box with 32G. It was still growing when I stopped it. At this point it had
> created ~800 values each of which was < 1Mb in size (owing to the
> limitations on message size set at the broker). (I wrote all the values
> into Redis so I could count them and check the aggregation).
>
> 1. Why is it consuming so much memory?
> 2. Is there a strategy for controlling this growth?
>
> I get that it's keeping every window open in case a new value shows up.
> Maybe some way to relax this using event time vs clock time?
>


NotEnoughReplication

2016-12-09 Thread Mohit Anchlia
What's the best way to fix NotEnoughReplication given all the nodes are up
and running? Zookeeper did go down momentarily. We are on Kafka 0.10

org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync
replicas for partition [__consumer_offsets,20] is [1], below required
minimum [2]


Re: controlling memory growth when aggregating

2016-12-09 Thread Jon Yeargers
I updated my consumer to that build. The memory issue seems to have abated.
TY!

Have started seeing this exception semi-regularly though:

ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
MinuteAgg failed on partition assignment

java.lang.IllegalStateException: task [1_4] Log end offset should not
change while restoring

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245)

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:198)

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)

at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:206)

at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)

at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64)

at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)

at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)

at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

On Fri, Dec 9, 2016 at 4:29 PM, Jon Yeargers 
wrote:

> Perhaps that's the problem. Yes - I'm still using 0.10.1.0.
>
> Does this involve a broker update?
>
> On Fri, Dec 9, 2016 at 7:12 AM, Damian Guy  wrote:
>
>> Hi Jon,
>>
>> Are you using 0.10.1? There is a resource leak to do with the Window
>> Iterator. The bug has been fixed on the 0.10.1 branch (which will soon be
>> released as 0.10.1.1)
>> and it is also fixed in the confluent fork.
>>
>> You can get the confluent version by using the following:
>>
>> 
>> 
>> confluent
>> http://packages.confluent.io/maven/
>> 
>>
>> 
>> org.apache.kafka
>> kafka-streams
>> 0.10.1.0-cp2
>> org.apache.kafka
>> kafka-clients
>> 0.10.1.0-cp2
>>
>>
>> On Thu, 8 Dec 2016 at 23:37 Jon Yeargers 
>> wrote:
>>
>> I working with JSON data that has an array member. Im aggregating values
>> into this using minute long windows.
>>
>> I ran the app for ~10 minutes and watched it consume 40% of the memory on
>> a
>> box with 32G. It was still growing when I stopped it. At this point it had
>> created ~800 values each of which was < 1Mb in size (owing to the
>> limitations on message size set at the broker). (I wrote all the values
>> into Redis so I could count them and check the aggregation).
>>
>> 1. Why is it consuming so much memory?
>> 2. Is there a strategy for controlling this growth?
>>
>> I get that it's keeping every window open in case a new value shows up.
>> Maybe some way to relax this using event time vs clock time?
>>
>
>


checking consumer lag on KStreams app?

2016-12-09 Thread Jon Yeargers
How would this be done?


lag for a specific partitions on newly added host

2016-12-09 Thread Jeremy Hansen
I added a new host to kafka.  Partitions that fall on this new host have a very 
high lag and I’m trying to understand why this would be and how to fix it.

iloveconsuming  blumfrub  0  5434682
 7416933 1982251 
iloveconsuming_kf0001.host.com-1481225033576-47393b55-0
iloveconsuming  blumfrub  1  7416828
 7416875 47  
iloveconsuming_kf0001.host.com-1481225033769-17152bca-0
iloveconsuming  blumfrub  2  7416799
 7416848 49  
iloveconsuming_kf0001.host.com-1481225033791-77a30285-0
iloveconsuming  blumfrub  3  7416898
 7416903 5   
iloveconsuming_kf0001.host.com-1481225033822-d088f844-0
iloveconsuming  blumfrub  4  7416891
 7416925 34  
iloveconsuming_kf0001.host.com-1481225033846-78f8e5b5-0
iloveconsuming  blumfrub  5  7416843
 7416883 40  
iloveconsuming_kf0001.host.com-1481225033869-54027178-0
iloveconsuming  blumfrub  6  5434720
 7416896 1982176 
iloveconsuming_kf0001.host.com-1481225033891-cc3f6bf6-0
iloveconsuming  blumfrub  7  7416896
 7416954 58  
iloveconsuming_kf0001.host.com-1481225033915-79b49de8-0
iloveconsuming  blumfrub  8  7416849
 7416898 49  
iloveconsuming_kf0001.host.com-1481225033939-1fe784c0-0
iloveconsuming  blumfrub  9  7416898
 7416917 19  
iloveconsuming_kf0001.host.com-1481225033961-40cc3185-0
iloveconsuming  blumfrub  10 35457186   
 3545722135  
iloveconsuming_kf0001.host.com-1481225033998-a817062e-0
iloveconsuming  blumfrub  11 7416866
 7416909 43  
iloveconsuming_kf0001.host.com-1481225034020-7a15999e-0
iloveconsuming  blumfrub  12 5434739
 7416907 1982168 
iloveconsuming_kf0001.host.com-1481225034043-badde97c-0
iloveconsuming  blumfrub  13 7416818
 7416865 47  
iloveconsuming_kf0001.host.com-1481225034066-6e77e3dc-0
iloveconsuming  blumfrub  14 7416901
 7416947 46  
iloveconsuming_kf0002.host.com-1481225107317-32355c51-0

Why do the partitions that fall on the newly added host so lagged?

Thanks
-jeremy

Re: Kafka windowed table not aggregating correctly

2016-12-09 Thread Sachin Mittal
Hi,
I think windows retention period does not solves the problem, only delays
it.
Based on what I understand say I set the time to 1 year using until.
Then when I get the message with timestamp 1 year + 1 sec it will delete
the old windows and create new ones from that message.
Now let us say we get next message with timestamp 1 year - 1 sec, based on
what you said, it will ignore this message.

In my case we get messages from different sources whose clocks are not in
sync. So overall message come with increasing timestamp but for a short
duration there is no order guarantee.

So I think before deleting the older windows it should retain small portion
of old windows too, so nearby older messages are not dropped.

I suggest have something like windows.size.advanceBy.until.retain
Retain will retain the periods which fall under retain ms from the upper
bound.

So window can be defined as
TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
1000l).untill(365 * 24 * 3600 * 1000l).retain(900 * 1000l)
So when dropping older windows it will retain the ones fall in last 15
minutes.


Please let me know in case I missed something on how and if at all older
messages are dropped.

Thanks
Sachin





On Sat, Dec 10, 2016 at 5:45 AM, Guozhang Wang  wrote:

> Assuming your windows retention period is the same to the window length,
> then it is true that ZZ will cause the current window to be dropper. And
> then when ZZA is recieved, it will not cause the old windows to be
> re-created but will be ignored since it is considered as "expired".
>
> Note that you can set the window retention period much longer than the
> window length itself, using the "until" API I mentioned above to handle any
> sudden future records.
>
>
>
> Guozhang
>
> On Thu, Dec 8, 2016 at 8:19 PM, Sachin Mittal  wrote:
>
> > Hi,
> > Right now in order to circumvent this problem I am using a timestamp
> whose
> > values increase by few ms as and when I get new records.
> > So lets say I have records in order
> > A -> lower limit TS + 1 sec
> > B -> lower limit TS + 3 sec
> > C -> lower limit TS + 5 sec
> > ..
> > Z -> upper limit TS - 1 sec
> >
> > Now say I get a record ZZ with ts upper limit TS + 1 sec I assume it will
> > drop the previous windows and create new ones based on this timestamp.
> > Please confirm this understanding.
> >
> > Now lets say I get new record ZZA with timestamp (old) upper limit TS - 1
> > sec, will this again cause new windows to be dropped and recreate older
> > windows fresh with all the older aggregation done so far lost?
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> > On Fri, Dec 9, 2016 at 12:16 AM, Guozhang Wang 
> wrote:
> >
> > > Hello Sachin,
> > >
> > > I am with you that ideally the windowing segmentation implementation
> > should
> > > be totally abstracted from users but today it is a bit confusing to
> > > understand. I have filed JIRA some time ago to improve on this end:
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-3596
> > >
> > > So to your example, if a "far future record" was received whose
> timestamp
> > > is beyond current time + the retention period, it could potentially
> cause
> > > the current window to be dropped.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal 
> > wrote:
> > >
> > > > Hi,
> > > > I think now it makes all the sense. The field I was using for
> timestamp
> > > > extractor contains timestamps which spans for greater than a day's
> > > duration
> > > > and it worked for wall clock because for short duration timestamps
> were
> > > in
> > > > day's range.
> > > >
> > > > I wanted to understand one thing:
> > > > Say I have a timestamp extractor field and as record gets ingested
> > future
> > > > records will have increasing values for the timestamp.
> > > >
> > > > Now lets say default duration is one day. At a future time a record
> > will
> > > > have timestamp which now is greater than the initial day's range.
> > > > What will happen then, it will create a new segment and then create
> > > windows
> > > > in it for the next day's duration?
> > > > What happens if now it gets a record from the previous day, will it
> get
> > > > discarded or will it again have just the single value aggregated in
> it
> > > > (previous values are lost).
> > > > So when new segment is create as I understand does it retain the
> older
> > > > segments data.
> > > >
> > > > This is bit confusing, so would be helpful if you can explain in bit
> > more
> > > > detail.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > > On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Sachin,
> > > > >
> > > > > One thing to note is that the retention of the windowed stores
> works
> > by
> > > > > keeping multiple segments of the stores where each segments stores
> a
> > > time
> > > > > range which can potentially span multiple windows, if a new window
> > > needs
> > > > to
> > > > > be created that is further from the

Re: lag for a specific partitions on newly added host

2016-12-09 Thread Jeremy Hansen
Here’s the topic description:

Topic:blumfrub  PartitionCount:15   ReplicationFactor:5 Configs:
Topic: blumfrub Partition: 0Leader: 1001Replicas: 1001,0,1,2,3  
Isr: 0,1001,1,2,3
Topic: blumfrub Partition: 1Leader: 0   Replicas: 0,1,2,3,4 
Isr: 0,1,2,3,4  
Topic: blumfrub Partition: 2Leader: 1   Replicas: 1,2,3,4,1001  
Isr: 1001,1,2,3,4
Topic: blumfrub Partition: 3Leader: 2   Replicas: 2,3,4,1001,0  
Isr: 0,1001,2,3,4
Topic: blumfrub Partition: 4Leader: 0   Replicas: 3,4,1001,0,1  
Isr: 0,1001,1,3,4
Topic: blumfrub Partition: 5Leader: 4   Replicas: 4,1001,0,1,2  
Isr: 0,1001,1,2,4
Topic: blumfrub Partition: 6Leader: 1001Replicas: 1001,1,2,3,4  
Isr: 1001,1,2,3,4
Topic: blumfrub Partition: 7Leader: 0   Replicas: 0,2,3,4,1001  
Isr: 0,1001,2,3,4
Topic: blumfrub Partition: 8Leader: 1   Replicas: 1,3,4,1001,0  
Isr: 0,1001,1,3,4
Topic: blumfrub Partition: 9Leader: 2   Replicas: 2,4,1001,0,1  
Isr: 0,1001,1,2,4
Topic: blumfrub Partition: 10   Leader: 0   Replicas: 3,1001,0,1,2  
Isr: 0,1001,1,2,3
Topic: blumfrub Partition: 11   Leader: 4   Replicas: 4,0,1,2,3 
Isr: 0,1,2,3,4  
Topic: blumfrub Partition: 12   Leader: 1001Replicas: 1001,2,3,4,0  
Isr: 0,1001,2,3,4
Topic: blumfrub Partition: 13   Leader: 0   Replicas: 0,3,4,1001,1  
Isr: 0,1001,1,3,4
Topic: blumfrub Partition: 14   Leader: 1   Replicas: 1,4,1001,0,2  
Isr: 0,1001,1,2,4

1001 is the new broker.

-jeremy



> On Dec 9, 2016, at 8:55 PM, Jeremy Hansen  wrote:
> 
> I added a new host to kafka.  Partitions that fall on this new host have a 
> very high lag and I’m trying to understand why this would be and how to fix 
> it.
> 
> iloveconsuming  blumfrub  0  5434682  
>7416933 1982251 
> iloveconsuming_kf0001.host.com-1481225033576-47393b55-0
> iloveconsuming  blumfrub  1  7416828  
>7416875 47  
> iloveconsuming_kf0001.host.com-1481225033769-17152bca-0
> iloveconsuming  blumfrub  2  7416799  
>7416848 49  
> iloveconsuming_kf0001.host.com-1481225033791-77a30285-0
> iloveconsuming  blumfrub  3  7416898  
>7416903 5   
> iloveconsuming_kf0001.host.com-1481225033822-d088f844-0
> iloveconsuming  blumfrub  4  7416891  
>7416925 34  
> iloveconsuming_kf0001.host.com-1481225033846-78f8e5b5-0
> iloveconsuming  blumfrub  5  7416843  
>7416883 40  
> iloveconsuming_kf0001.host.com-1481225033869-54027178-0
> iloveconsuming  blumfrub  6  5434720  
>7416896 1982176 
> iloveconsuming_kf0001.host.com-1481225033891-cc3f6bf6-0
> iloveconsuming  blumfrub  7  7416896  
>7416954 58  
> iloveconsuming_kf0001.host.com-1481225033915-79b49de8-0
> iloveconsuming  blumfrub  8  7416849  
>7416898 49  
> iloveconsuming_kf0001.host.com-1481225033939-1fe784c0-0
> iloveconsuming  blumfrub  9  7416898  
>7416917 19  
> iloveconsuming_kf0001.host.com-1481225033961-40cc3185-0
> iloveconsuming  blumfrub  10 35457186 
>3545722135  
> iloveconsuming_kf0001.host.com-1481225033998-a817062e-0
> iloveconsuming  blumfrub  11 7416866  
>7416909 43  
> iloveconsuming_kf0001.host.com-1481225034020-7a15999e-0
> iloveconsuming  blumfrub  12 5434739  
>7416907 1982168 
> iloveconsuming_kf0001.host.com-1481225034043-badde97c-0
> iloveconsuming  blumfrub  13 7416818  
>7416865 47  
> iloveconsuming_kf0001.host.com-1481225034066-6e77e3dc-0
> iloveconsuming  blumfrub  14 7416901  
>7416947 46  
> iloveconsuming_kf0002.host.com-1481225107317-32355c51-0
> 
> Why do the partitions that fall on the newly added host so lagged?
> 
> Thanks
> -jeremy