Re: Kafka windowed table not aggregating correctly

2016-11-25 Thread Sachin Mittal
Hi,
I fixed that sorted set issue but I am facing a weird problem which I am
not able to replicate.

Here is the sample problem that I could isolate:
My class is like this:
public static class Message implements Comparable {
public long ts;
public String message;
public String key;
public Message() {};
public Message(long ts, String message, String key) {
this.ts = ts;
this.key = key;
this.message = message;
}
public int compareTo(Message paramT) {
long ts1 = paramT.ts;
return ts > ts1 ? 1 : -1;
}
}

pipeline is like this:
builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
 .map(new KeyValueMapper>() {
 public KeyValue apply(String key, Message value) {
 return new KeyValue(value.key, value);
  }
 })
.through(Serdes.String(), messageSerde, "test-window-key-stream")
.aggregateByKey(new Initializer() {
public SortedSet apply() {
return new TreeSet();
}
}, new Aggregator() {
public SortedSet apply(String aggKey, Message value,
SortedSet aggregate) {
aggregate.add(value);
return aggregate;
}
}, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
Serdes.String(), messagesSerde)
.foreach(new ForeachAction() {
public void apply(Windowed key, SortedSet messages) {
...
}
});

So basically I rekey the original message into another topic and then
aggregate it based on that key.
What I have observed is that when I used windowed aggregation the
aggregator does not use previous aggregated value.

public SortedSet apply(String aggKey, Message value,
SortedSet aggregate) {
aggregate.add(value);
return aggregate;
}

So in the above function the aggregate is an empty set of every value
entering into pipeline. When I remove the windowed aggregation, the
aggregate set retains previously aggregated values in the set.

I am just not able to wrap my head around it. When I ran this type of test
locally on windows it is working fine. However a similar pipeline setup
when run against production on linux is behaving strangely and always
getting an empty aggregate set.
Any idea what could be the reason, where should I look at the problem. Does
length of key string matters here? I will later try to run the same simple
setup on linux and see what happens. But this is a very strange behavior.

Thanks
Sachin



On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang  wrote:

> Hello Sachin,
>
> In the implementation of SortedSet, if the object's implemented the
> Comparable interface, that compareTo function is applied in "
> aggregate.add(value);", and hence if it returns 0, this element will not be
> added since it is a Set.
>
>
> Guozhang
>
>
> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal 
> wrote:
>
> > Hi,
> > What I find is that when I use sorted set as aggregation it fails to
> > aggregate the values which have compareTo returning 0.
> >
> > My class is like this:
> > public class Message implements Comparable {
> > public long ts;
> > public String message;
> > public Message() {};
> > public Message(long ts, String message) {
> > this.ts = ts;
> > this.message = message;
> > }
> > public int compareTo(Message paramT) {
> > long ts1 = paramT.ts;
> > return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
> > }
> > }
> >
> > pipeline is like this:
> > builder.stream(Serdes.String(), messageSerde, "test-window-stream")
> > .aggregateByKey(new Initializer() {
> > public SortedSet apply() {
> > return new TreeSet();
> > }
> > }, new Aggregator() {
> > public SortedSet apply(String aggKey, Message value,
> > SortedSet aggregate) {
> > aggregate.add(value);
> > return aggregate;
> > }
> > }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> > Serdes.String(), messagesSerde)
> > .foreach(new ForeachAction() {
> > public void apply(Windowed key, SortedSet messages)
> {
> > ...
> > }
> > });
> >
> > So any message published between 10 and 20 seconds gets aggregated in 10
> -
> > 20 bucket and I print the size of the set.
> > However output I get is following:
> >
> > Published: 14
> > Aggregated: 10  20 -> 1
> >
> > Published: 18
> > Aggregated: 10  20 -> 2
> >
> > Published: 11
> > Aggregated: 10  20 -> 3
> >
> > Published: 17
> > Aggregated: 10  20 -> 4
> >
> > Published: 14
> > Aggregated: 10  20 -> 4
> >
> > Published: 15
> > Aggregated: 10  20 -> 5
> >
> > Published: 12
> > Aggregated: key2  10  20 -> 6
> >
> > Published: 12
> > Aggregated: 10  20 -> 6
> >
> > So if you see any message that occurs 

Re: Kafka consumers are not equally distributed

2016-11-25 Thread Guozhang Wang
You can take a look at this FAQ wiki:
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified
?

And even if you are using the new Java producer, if you specify the key and
key distribution is not even, then it will not be evenly distributed.

Guozhang

On Fri, Nov 25, 2016 at 9:12 AM, Ghosh, Achintya (Contractor) <
achintya_gh...@comcast.com> wrote:

> So what is the option to messages make it equally distributed from that
> point? I mean is any other option to make the consumers to speed up?
>
> Thanks
> Acintya
>
> -Original Message-
> From: Guozhang Wang [mailto:wangg...@gmail.com]
> Sent: Friday, November 25, 2016 12:09 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka consumers are not equally distributed
>
> Note that consumer's fetching parallelism is per-partition, i.e., one
> partition is fetched by only a single consumer instance, so even if some
> partitions have heavy load other idle consumers will not come to share the
> messages.
>
> If you observed that some partitions have no messages while others have a
> lot, then it means the producing load on the partitions are not evenly
> distributed, as I mentioned in the previous comment it is not a consumer
> issue but a producer issue.
>
>
> Guozhang
>
> On Fri, Nov 25, 2016 at 7:11 AM, Ghosh, Achintya (Contractor) <
> achintya_gh...@comcast.com> wrote:
>
> > Thank you Guozhang.
> >
> > Let me clarify : "some of the partitions are sitting idle and some of
> > are overloaded", I mean we stopped the load after 9 hours as see the
> > messages were processing very slow. That time we observed that some
> > partitions had lot of messages and some were sitting idle. So my
> > question why messages were not shared if we see some are overloaded
> > and some are having 0 messages. Even we started the kafka servers and
> > application servers too but nothing happened, still it was processing
> > very slow and messages were not distributed. So we are concerned what
> > should do this kind of situation and make the consumers more speedy.
> >
> > Thanks
> > Achintya
> >
> > -Original Message-
> > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > Sent: Thursday, November 24, 2016 11:21 PM
> > To: users@kafka.apache.org
> > Subject: Re: Kafka consumers are not equally distributed
> >
> > The default partition assignment strategy is the RangePartitioner.
> > Note it is per-topic, so if you use the default partitioner then in
> > your case 160 partitions of each of the topic will be assigned to the
> > first 160 consumer instances, each getting two partitions, one
> > partition from each. So the consumer should be balanced  on the
> consumer-instance basis.
> >
> > I'm not sure what you meant by "some of the partitions are sitting
> > idle and some of are overloaded", do you mean that some partitions
> > does not have new data coming in and others keep getting high traffic
> > producing to it that the consumer cannot keep up? In this case it is
> > no the consumer's issue, but the producer not producing in a balanced
> manner.
> >
> >
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Thu, Nov 24, 2016 at 7:45 PM, Ghosh, Achintya (Contractor) <
> > achintya_gh...@comcast.com> wrote:
> >
> > > Java consumer. 0.9.1
> > >
> > > Thanks
> > > Achintya
> > >
> > > -Original Message-
> > > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > > Sent: Thursday, November 24, 2016 8:28 PM
> > > To: users@kafka.apache.org
> > > Subject: Re: Kafka consumers are not equally distributed
> > >
> > > Which version of Kafka are you using with your consumer? Is it Scala
> > > or Java consumers?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Nov 23, 2016 at 6:38 AM, Ghosh, Achintya (Contractor) <
> > > achintya_gh...@comcast.com> wrote:
> > >
> > > > No, that is not the reason. Initially all the partitions were
> > > > assigned the messages and those were processed very fast and sit
> > > > idle even other partitions  are having a lot of messages to be
> > processed.
> > > > So I was under impression  that rebalance should be triggered and
> > > > messages will be re-distributed equally again.
> > > >
> > > > Thanks
> > > > Achintya
> > > >
> > > > -Original Message-
> > > > From: Sharninder [mailto:sharnin...@gmail.com]
> > > > Sent: Wednesday, November 23, 2016 12:33 AM
> > > > To: users@kafka.apache.org
> > > > Cc: d...@kafka.apache.org
> > > > Subject: Re: Kafka consumers are not equally distributed
> > > >
> > > > Could it be because of the partition key ?
> > > >
> > > > On Wed, Nov 23, 2016 at 12:33 AM, Ghosh, Achintya (Contractor) <
> > > > achintya_gh...@comcast.com> wrote:
> > > >
> > > > > Hi there,
> > > > >
> > > > > We are doing the load test in Kafka with 25tps and first 9 hours
> > > > > it went fine almost 80K/hr messages were processed after that we
> > > > > see a lot of lags and we stopped the incoming load.
> > > > >
> > > > > Currently we see 

Re: Initializing StateStores takes *really* long for large datasets

2016-11-25 Thread Frank Lyaruu
I'm running all on a single node, so there is no 'data mobility' involved.
So if Streams does not use any existing data, I might as well wipe the
whole RocksDb before starting, right?

As for the RocksDb tuning, I am using a RocksDBConfigSetter, to reduce the
memory usage a bit:

options.setWriteBufferSize(300);
options.setMaxBytesForLevelBase(3000);
options.setMaxBytesForLevelMultiplier(3);

I needed to do this as my 16Gb machine would die otherwise but I honestly
was just reducing values more or less randomly until it wouldn't fall over.
I have to say this is a big drawback of Rocks, I monitor Java memory usage
but this just sneaks under the radar as it is off heap, and it isn't very
clear what the implications are of different settings, as I can't says
something like the Xmx heap setting, meaning: Take whatever you need up to
this maximum. Also, if I get this right, in the long run, as the data set
changes and grows, I can never be sure it won't take too much memory.

I get the impression I'll be better off with an external store, something I
can monitor, tune and restart separately.

But I'm getting ahead of myself. I'll wipe the data before I start, see if
that gets me any stability




On Fri, Nov 25, 2016 at 4:54 PM, Damian Guy  wrote:

> Hi Frank,
>
> If you have run the app before with the same applicationId, completely shut
> it down, and then restarted it again, it will need to restore all of the
> state which will take some time depending on the amount of data you have.
> In this case the placement of the partitions doesn't take into account any
> existing state stores, so it might need to load quite a lot of data if
> nodes assigned certain partitions don't have that state-store (this is
> something we should look at improving).
>
> As for RocksDB tuning - you can provide an implementation of
> RocksDBConfigSetter via config: StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS
> it has a single method:
>
> public void setConfig(final String storeName, final Options options,
> final Map configs)
>
> in this method you can set various options on the provided Options object.
> The options that might help in this case are:
> options.setWriteBufferSize(..)  - default in streams is 32MB
> options.setMaxWriteBufferNumer(..) - default in streams is 3
>
> However, i'm no expert on RocksDB and i suggest you have look at
> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide for more
> info.
>
> Thanks,
> Damian
>
> On Fri, 25 Nov 2016 at 13:02 Frank Lyaruu  wrote:
>
> > @Damian:
> >
> > Yes, it ran before, and it has that 200gb blob worth of Rocksdb stuff
> >
> > @Svente: It's on a pretty high end san in a managed private cloud, I'm
> > unsure what the ultimate storage is, but I doubt there is a performance
> > problem there.
> >
> > On Fri, 25 Nov 2016 at 13:37, Svante Karlsson 
> > wrote:
> >
> > > What kind of disk are you using for the rocksdb store? ie spinning or
> > ssd?
> > >
> > > 2016-11-25 12:51 GMT+01:00 Damian Guy :
> > >
> > > > Hi Frank,
> > > >
> > > > Is this on a restart of the application?
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Fri, 25 Nov 2016 at 11:09 Frank Lyaruu  wrote:
> > > >
> > > > > Hi y'all,
> > > > >
> > > > > I have a reasonably simple KafkaStream application, which merges
> > about
> > > 20
> > > > > topics a few times.
> > > > > The thing is, some of those topic datasets are pretty big, about
> 10M
> > > > > messages. In total I've got
> > > > > about 200Gb worth of state in RocksDB, the largest topic is 38 Gb.
> > > > >
> > > > > I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to cover the
> > > > > initialization time,
> > > > > but that does not seem nearly enough, I'm looking at more than two
> > hour
> > > > > startup times, and
> > > > > that starts to be a bit ridiculous.
> > > > >
> > > > > Any tips / experiences on how to deal with this case? Move away
> from
> > > > Rocks
> > > > > and use an external
> > > > > data store? Any tuning tips on how to tune Rocks to be a bit more
> > > useful
> > > > > here?
> > > > >
> > > > > regards, Frank
> > > > >
> > > >
> > >
> >
>


Re: no luck with kafka-connect on secure cluster

2016-11-25 Thread Koert Kuipers
well it seems if you run connect in distributed mode... its again
security.protocol=SASL_PLAINTEXT and not producer.security.protocol=
SASL_PLAINTEXT

dont ask me why

On Thu, Nov 24, 2016 at 10:40 PM, Koert Kuipers  wrote:

> for anyone that runs into this. turns out i also had to set:
> producer.security.protocol=SASL_PLAINTEXT
> producer.sasl.kerberos.service.name=kafka
>
>
> On Thu, Nov 24, 2016 at 8:54 PM, Koert Kuipers  wrote:
>
>> i have a secure kafka 0.10.1 cluster using SASL_PLAINTEXT
>>
>> the kafka servers seem fine, and i can start console-consumer and
>> console-producer and i see the message i type in the producer pop up in the
>> consumer. no problems so far.
>>
>> for example to start console-producer:
>> $ kinit
>> $ export KAFKA_OPTS="-Djava.security.auth.login.config=config/jaas.conf"
>> $ bin/kafka-console-producer.sh --producer.config
>> config/producer.properties --topic test --broker-list
>> SASL_PLAINTEXT://somenode:9092
>>
>> but i am having no luck whatsoever with kafka-connect. i tried this:
>> $ kinit
>> $ export KAFKA_OPTS="-Djava.security.auth.login.config=config/jaas.conf"
>> $ bin/connect-standalone.sh config/connect-standalone.properties
>> config/connect-console-source.properties
>>
>> my config/connect-console-source.properties is unchanged. my
>> config/connect-standalone has:
>>
>> bootstrap.servers=SASL_PLAINTEXT://somenode:9092
>> security.protocol=SASL_PLAINTEXT
>> sasl.kerberos.service.name=kafka
>> key.converter=org.apache.kafka.connect.json.JsonConverter
>> value.converter=org.apache.kafka.connect.json.JsonConverter
>> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>> internal.key.converter.schemas.enable=false
>> internal.value.converter.schemas.enable=false
>> offset.storage.file.filename=/tmp/connect.offsets
>> offset.flush.interval.ms=1
>>
>> i get these logs in an infinite loop:
>> [2016-11-24 20:47:18,528] DEBUG Node -1 disconnected.
>> (org.apache.kafka.clients.NetworkClient:463)
>> [2016-11-24 20:47:18,528] WARN Bootstrap broker somenode:9092
>> disconnected (org.apache.kafka.clients.NetworkClient:568)
>> [2016-11-24 20:47:18,528] DEBUG Give up sending metadata request since no
>> node is available (org.apache.kafka.clients.NetworkClient:625)
>> [2016-11-24 20:47:18,629] DEBUG Initialize connection to node -1 for
>> sending metadata request (org.apache.kafka.clients.NetworkClient:644)
>> [2016-11-24 20:47:18,629] DEBUG Initiating connection to node -1 at
>> somenode:9092. (org.apache.kafka.clients.NetworkClient:496)
>> [2016-11-24 20:47:18,631] DEBUG Created socket with SO_RCVBUF = 32768,
>> SO_SNDBUF = 124928, SO_TIMEOUT = 0 to node -1 (org.apache.kafka.common.netwo
>> rk.Selector:327)
>> [2016-11-24 20:47:18,631] DEBUG Completed connection to node -1
>> (org.apache.kafka.clients.NetworkClient:476)
>> [2016-11-24 20:47:18,730] DEBUG Sending metadata request
>> {topics=[connect-test]} to node -1 (org.apache.kafka.clients.Netw
>> orkClient:640)
>> [2016-11-24 20:47:18,730] DEBUG Connection with somenode/192.168.1.54
>> disconnected (org.apache.kafka.common.network.Selector:365)
>> java.io.EOFException
>> at org.apache.kafka.common.network.NetworkReceive.readFromReada
>> bleChannel(NetworkReceive.java:83)
>> at org.apache.kafka.common.network.NetworkReceive.readFrom(
>> NetworkReceive.java:71)
>> at org.apache.kafka.common.network.KafkaChannel.receive(KafkaCh
>> annel.java:154)
>> at org.apache.kafka.common.network.KafkaChannel.read(KafkaChann
>> el.java:135)
>> at org.apache.kafka.common.network.Selector.pollSelectionKeys(
>> Selector.java:343)
>> at org.apache.kafka.common.network.Selector.poll(Selector.java:
>> 291)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.
>> java:260)
>> at org.apache.kafka.clients.producer.internals.Sender.run(Sende
>> r.java:236)
>> at org.apache.kafka.clients.producer.internals.Sender.run(Sende
>> r.java:135)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> i tried different kafka-connect connectors, same result.
>>
>> any ideas? thanks!
>>
>
>


Re: A strange controller log in Kafka 0.9.0.1

2016-11-25 Thread Json Tu
thanks guozhang,
if it's convenient,can we disscuss it in the jira 
https://issues.apache.org/jira/browse/KAFKA-4447 
,I guess some body may also 
encounter this problem.

> 在 2016年11月25日,下午12:31,Guozhang Wang  写道:
> 
> Does broker 100 keeps acting as the controller afterwards? What you observe
> is possible and should be transient since "unsubscribeChildChanges" on
> ZkClient and listener fired procedure are executed on different threads and
> they are not strictly synchronized. But if you continuously see broker
> 100's listener fires and it acts like a controller then there may be an
> issue with 0.9.0.1 version.
> 
> Guozhang
> 
> On Wed, Nov 23, 2016 at 7:28 AM, Json Tu  wrote:
> 
>> Hi,
>>We have a cluster of kafka 0.9.0.1 with 3 nodes, and we found a
>> strange controller log as below.
>> 
>> [2016-11-07 03:14:48,575] INFO [SessionExpirationListener on 100], ZK
>> expired; shut down all controller components and try to re-elect
>> (kafka.controller.KafkaController$SessionExpirationListener)
>> [2016-11-07 03:14:48,578] DEBUG [Controller 100]: Controller resigning,
>> broker id 100 (kafka.controller.KafkaController)
>> [2016-11-07 03:14:48,579] DEBUG [Controller 100]: De-registering
>> IsrChangeNotificationListener (kafka.controller.KafkaController)
>> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutting down
>> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Stopped
>> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutdown
>> completed (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>> [2016-11-07 03:14:48,580] INFO [Partition state machine on Controller
>> 100]: Stopped partition state machine (kafka.controller.
>> PartitionStateMachine)
>> [2016-11-07 03:14:48,580] INFO [Replica state machine on controller 100]:
>> Stopped replica state machine (kafka.controller.ReplicaStateMachine)
>> [2016-11-07 03:14:48,583] INFO [Controller-100-to-broker-101-send-thread],
>> Shutting down (kafka.controller.RequestSendThread)
>> [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread],
>> Stopped  (kafka.controller.RequestSendThread)
>> [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread],
>> Shutdown completed (kafka.controller.RequestSendThread)
>> [2016-11-07 03:14:48,586] INFO [Controller-100-to-broker-100-send-thread],
>> Shutting down (kafka.controller.RequestSendThread)
>> [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread],
>> Stopped  (kafka.controller.RequestSendThread)
>> [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread],
>> Shutdown completed (kafka.controller.RequestSendThread)
>> [2016-11-07 03:14:48,587] INFO [Controller 100]: Broker 100 resigned as
>> the controller (kafka.controller.KafkaController)
>> [2016-11-07 03:14:48,652] DEBUG [IsrChangeNotificationListener] Fired!!!
>> (kafka.controller.IsrChangeNotificationListener)
>> [2016-11-07 03:14:48,668] INFO [BrokerChangeListener on Controller 100]:
>> Broker change listener fired for path /brokers/ids with children 101,100
>> (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
>> [2016-11-07 03:14:48,683] DEBUG [DeleteTopicsListener on 100]: Delete
>> topics listener fired for topics  to be deleted (kafka.controller.
>> PartitionStateMachine$DeleteTopicsListener)
>> [2016-11-07 03:14:48,687] INFO [AddPartitionsListener on 100]: Add
>> Partition triggered {"version":1,"partitions":{"4"
>> :[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
>> for path /brokers/topics/movie.gateway.merselllog.syncCinema
>> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
>> [2016-11-07 03:14:48,694] INFO [AddPartitionsListener on 100]: Add
>> Partition triggered {"version":1,"partitions":{"4"
>> :[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
>> for path /brokers/topics/push_3rdparty_high (kafka.controller.
>> PartitionStateMachine$AddPartitionsListener)
>> [2016-11-07 03:14:48,707] INFO [AddPartitionsListener on 100]: Add
>> Partition triggered {"version":1,"partitions":{"4"
>> :[101,102],"5":[102,100],"1":[101,100],"0":[100,102],"2":[102,101],"3":[100,101]}}
>> for path /brokers/topics/icb_msg_push_high_02 (kafka.controller.
>> PartitionStateMachine$AddPartitionsListener)
>> [2016-11-07 03:14:48,715] INFO [AddPartitionsListener on 100]: Add
>> Partition triggered {"version":1,"partitions":{"4"
>> :[102,100],"5":[100,101],"1":[102,101],"0":[101,100],"2":[100,102],"3":[101,102]}}
>> for path /brokers/topics/movie.gateway.merselllog.unlockSeat
>> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
>> 
>> 
>>From the log we can see that old controller 100 resigned as the
>> controller successfully,but 

RE: Kafka consumers are not equally distributed

2016-11-25 Thread Ghosh, Achintya (Contractor)
So what is the option to messages make it equally distributed from that point? 
I mean is any other option to make the consumers to speed up?

Thanks
Acintya

-Original Message-
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Friday, November 25, 2016 12:09 PM
To: users@kafka.apache.org
Subject: Re: Kafka consumers are not equally distributed

Note that consumer's fetching parallelism is per-partition, i.e., one partition 
is fetched by only a single consumer instance, so even if some partitions have 
heavy load other idle consumers will not come to share the messages.

If you observed that some partitions have no messages while others have a lot, 
then it means the producing load on the partitions are not evenly distributed, 
as I mentioned in the previous comment it is not a consumer issue but a 
producer issue.


Guozhang

On Fri, Nov 25, 2016 at 7:11 AM, Ghosh, Achintya (Contractor) < 
achintya_gh...@comcast.com> wrote:

> Thank you Guozhang.
>
> Let me clarify : "some of the partitions are sitting idle and some of 
> are overloaded", I mean we stopped the load after 9 hours as see the 
> messages were processing very slow. That time we observed that some 
> partitions had lot of messages and some were sitting idle. So my 
> question why messages were not shared if we see some are overloaded 
> and some are having 0 messages. Even we started the kafka servers and 
> application servers too but nothing happened, still it was processing 
> very slow and messages were not distributed. So we are concerned what 
> should do this kind of situation and make the consumers more speedy.
>
> Thanks
> Achintya
>
> -Original Message-
> From: Guozhang Wang [mailto:wangg...@gmail.com]
> Sent: Thursday, November 24, 2016 11:21 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka consumers are not equally distributed
>
> The default partition assignment strategy is the RangePartitioner. 
> Note it is per-topic, so if you use the default partitioner then in 
> your case 160 partitions of each of the topic will be assigned to the 
> first 160 consumer instances, each getting two partitions, one 
> partition from each. So the consumer should be balanced  on the 
> consumer-instance basis.
>
> I'm not sure what you meant by "some of the partitions are sitting 
> idle and some of are overloaded", do you mean that some partitions 
> does not have new data coming in and others keep getting high traffic 
> producing to it that the consumer cannot keep up? In this case it is 
> no the consumer's issue, but the producer not producing in a balanced manner.
>
>
>
>
> Guozhang
>
>
>
>
> On Thu, Nov 24, 2016 at 7:45 PM, Ghosh, Achintya (Contractor) < 
> achintya_gh...@comcast.com> wrote:
>
> > Java consumer. 0.9.1
> >
> > Thanks
> > Achintya
> >
> > -Original Message-
> > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > Sent: Thursday, November 24, 2016 8:28 PM
> > To: users@kafka.apache.org
> > Subject: Re: Kafka consumers are not equally distributed
> >
> > Which version of Kafka are you using with your consumer? Is it Scala 
> > or Java consumers?
> >
> >
> > Guozhang
> >
> >
> > On Wed, Nov 23, 2016 at 6:38 AM, Ghosh, Achintya (Contractor) < 
> > achintya_gh...@comcast.com> wrote:
> >
> > > No, that is not the reason. Initially all the partitions were 
> > > assigned the messages and those were processed very fast and sit 
> > > idle even other partitions  are having a lot of messages to be
> processed.
> > > So I was under impression  that rebalance should be triggered and 
> > > messages will be re-distributed equally again.
> > >
> > > Thanks
> > > Achintya
> > >
> > > -Original Message-
> > > From: Sharninder [mailto:sharnin...@gmail.com]
> > > Sent: Wednesday, November 23, 2016 12:33 AM
> > > To: users@kafka.apache.org
> > > Cc: d...@kafka.apache.org
> > > Subject: Re: Kafka consumers are not equally distributed
> > >
> > > Could it be because of the partition key ?
> > >
> > > On Wed, Nov 23, 2016 at 12:33 AM, Ghosh, Achintya (Contractor) < 
> > > achintya_gh...@comcast.com> wrote:
> > >
> > > > Hi there,
> > > >
> > > > We are doing the load test in Kafka with 25tps and first 9 hours 
> > > > it went fine almost 80K/hr messages were processed after that we 
> > > > see a lot of lags and we stopped the incoming load.
> > > >
> > > > Currently we see 15K/hr messages are processing. We have 40 
> > > > consumer instances with concurrency 4 and 2 topics and both is 
> > > > having 160 partitions so each consumer with each partition.
> > > >
> > > > What we found that some of the partitions are sitting idle and 
> > > > some of are overloaded and its really slowing down the consumer 
> > > > message
> > > processing.
> > > >
> > > > Why rebalancing is not happening and existing messages are not 
> > > > distributed equally among the instances? We tried to restart the 
> > > > app still the same pace. Any idea what could be the reason?
> > > >
> > > > Thanks
> > > > Achintya
> > > >
> 

test

2016-11-25 Thread Samy CHBINOU

test


Re: Initializing StateStores takes *really* long for large datasets

2016-11-25 Thread Damian Guy
Hi Frank,

If you have run the app before with the same applicationId, completely shut
it down, and then restarted it again, it will need to restore all of the
state which will take some time depending on the amount of data you have.
In this case the placement of the partitions doesn't take into account any
existing state stores, so it might need to load quite a lot of data if
nodes assigned certain partitions don't have that state-store (this is
something we should look at improving).

As for RocksDB tuning - you can provide an implementation of
RocksDBConfigSetter via config: StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS
it has a single method:

public void setConfig(final String storeName, final Options options,
final Map configs)

in this method you can set various options on the provided Options object.
The options that might help in this case are:
options.setWriteBufferSize(..)  - default in streams is 32MB
options.setMaxWriteBufferNumer(..) - default in streams is 3

However, i'm no expert on RocksDB and i suggest you have look at
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide for more info.

Thanks,
Damian

On Fri, 25 Nov 2016 at 13:02 Frank Lyaruu  wrote:

> @Damian:
>
> Yes, it ran before, and it has that 200gb blob worth of Rocksdb stuff
>
> @Svente: It's on a pretty high end san in a managed private cloud, I'm
> unsure what the ultimate storage is, but I doubt there is a performance
> problem there.
>
> On Fri, 25 Nov 2016 at 13:37, Svante Karlsson 
> wrote:
>
> > What kind of disk are you using for the rocksdb store? ie spinning or
> ssd?
> >
> > 2016-11-25 12:51 GMT+01:00 Damian Guy :
> >
> > > Hi Frank,
> > >
> > > Is this on a restart of the application?
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Fri, 25 Nov 2016 at 11:09 Frank Lyaruu  wrote:
> > >
> > > > Hi y'all,
> > > >
> > > > I have a reasonably simple KafkaStream application, which merges
> about
> > 20
> > > > topics a few times.
> > > > The thing is, some of those topic datasets are pretty big, about 10M
> > > > messages. In total I've got
> > > > about 200Gb worth of state in RocksDB, the largest topic is 38 Gb.
> > > >
> > > > I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to cover the
> > > > initialization time,
> > > > but that does not seem nearly enough, I'm looking at more than two
> hour
> > > > startup times, and
> > > > that starts to be a bit ridiculous.
> > > >
> > > > Any tips / experiences on how to deal with this case? Move away from
> > > Rocks
> > > > and use an external
> > > > data store? Any tuning tips on how to tune Rocks to be a bit more
> > useful
> > > > here?
> > > >
> > > > regards, Frank
> > > >
> > >
> >
>


RE: Kafka consumers are not equally distributed

2016-11-25 Thread Ghosh, Achintya (Contractor)
Thank you Guozhang.

Let me clarify : "some of the partitions are sitting idle and some of are 
overloaded", I mean we stopped the load after 9 hours as see the messages were 
processing very slow. That time we observed that some partitions had lot of 
messages and some were sitting idle. So my question why messages were not 
shared if we see some are overloaded and some are having 0 messages. Even we 
started the kafka servers and application servers too but nothing happened, 
still it was processing very slow and messages were not distributed. So we are 
concerned what should do this kind of situation and make the consumers more 
speedy.

Thanks
Achintya

-Original Message-
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Thursday, November 24, 2016 11:21 PM
To: users@kafka.apache.org
Subject: Re: Kafka consumers are not equally distributed

The default partition assignment strategy is the RangePartitioner. Note it is 
per-topic, so if you use the default partitioner then in your case 160 
partitions of each of the topic will be assigned to the first 160 consumer 
instances, each getting two partitions, one partition from each. So the 
consumer should be balanced  on the consumer-instance basis.

I'm not sure what you meant by "some of the partitions are sitting idle and 
some of are overloaded", do you mean that some partitions does not have new 
data coming in and others keep getting high traffic producing to it that the 
consumer cannot keep up? In this case it is no the consumer's issue, but the 
producer not producing in a balanced manner.




Guozhang




On Thu, Nov 24, 2016 at 7:45 PM, Ghosh, Achintya (Contractor) < 
achintya_gh...@comcast.com> wrote:

> Java consumer. 0.9.1
>
> Thanks
> Achintya
>
> -Original Message-
> From: Guozhang Wang [mailto:wangg...@gmail.com]
> Sent: Thursday, November 24, 2016 8:28 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka consumers are not equally distributed
>
> Which version of Kafka are you using with your consumer? Is it Scala 
> or Java consumers?
>
>
> Guozhang
>
>
> On Wed, Nov 23, 2016 at 6:38 AM, Ghosh, Achintya (Contractor) < 
> achintya_gh...@comcast.com> wrote:
>
> > No, that is not the reason. Initially all the partitions were 
> > assigned the messages and those were processed very fast and sit 
> > idle even other partitions  are having a lot of messages to be processed.
> > So I was under impression  that rebalance should be triggered and 
> > messages will be re-distributed equally again.
> >
> > Thanks
> > Achintya
> >
> > -Original Message-
> > From: Sharninder [mailto:sharnin...@gmail.com]
> > Sent: Wednesday, November 23, 2016 12:33 AM
> > To: users@kafka.apache.org
> > Cc: d...@kafka.apache.org
> > Subject: Re: Kafka consumers are not equally distributed
> >
> > Could it be because of the partition key ?
> >
> > On Wed, Nov 23, 2016 at 12:33 AM, Ghosh, Achintya (Contractor) < 
> > achintya_gh...@comcast.com> wrote:
> >
> > > Hi there,
> > >
> > > We are doing the load test in Kafka with 25tps and first 9 hours 
> > > it went fine almost 80K/hr messages were processed after that we 
> > > see a lot of lags and we stopped the incoming load.
> > >
> > > Currently we see 15K/hr messages are processing. We have 40 
> > > consumer instances with concurrency 4 and 2 topics and both is 
> > > having 160 partitions so each consumer with each partition.
> > >
> > > What we found that some of the partitions are sitting idle and 
> > > some of are overloaded and its really slowing down the consumer 
> > > message
> > processing.
> > >
> > > Why rebalancing is not happening and existing messages are not 
> > > distributed equally among the instances? We tried to restart the 
> > > app still the same pace. Any idea what could be the reason?
> > >
> > > Thanks
> > > Achintya
> > >
> > >
> >
> >
> > --
> > --
> > Sharninder
> >
>
>
>
> --
> -- Guozhang
>



--
-- Guozhang


Re: Initializing StateStores takes *really* long for large datasets

2016-11-25 Thread Frank Lyaruu
@Damian:

Yes, it ran before, and it has that 200gb blob worth of Rocksdb stuff

@Svente: It's on a pretty high end san in a managed private cloud, I'm
unsure what the ultimate storage is, but I doubt there is a performance
problem there.

On Fri, 25 Nov 2016 at 13:37, Svante Karlsson 
wrote:

> What kind of disk are you using for the rocksdb store? ie spinning or ssd?
>
> 2016-11-25 12:51 GMT+01:00 Damian Guy :
>
> > Hi Frank,
> >
> > Is this on a restart of the application?
> >
> > Thanks,
> > Damian
> >
> > On Fri, 25 Nov 2016 at 11:09 Frank Lyaruu  wrote:
> >
> > > Hi y'all,
> > >
> > > I have a reasonably simple KafkaStream application, which merges about
> 20
> > > topics a few times.
> > > The thing is, some of those topic datasets are pretty big, about 10M
> > > messages. In total I've got
> > > about 200Gb worth of state in RocksDB, the largest topic is 38 Gb.
> > >
> > > I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to cover the
> > > initialization time,
> > > but that does not seem nearly enough, I'm looking at more than two hour
> > > startup times, and
> > > that starts to be a bit ridiculous.
> > >
> > > Any tips / experiences on how to deal with this case? Move away from
> > Rocks
> > > and use an external
> > > data store? Any tuning tips on how to tune Rocks to be a bit more
> useful
> > > here?
> > >
> > > regards, Frank
> > >
> >
>


Re: Initializing StateStores takes *really* long for large datasets

2016-11-25 Thread Svante Karlsson
What kind of disk are you using for the rocksdb store? ie spinning or ssd?

2016-11-25 12:51 GMT+01:00 Damian Guy :

> Hi Frank,
>
> Is this on a restart of the application?
>
> Thanks,
> Damian
>
> On Fri, 25 Nov 2016 at 11:09 Frank Lyaruu  wrote:
>
> > Hi y'all,
> >
> > I have a reasonably simple KafkaStream application, which merges about 20
> > topics a few times.
> > The thing is, some of those topic datasets are pretty big, about 10M
> > messages. In total I've got
> > about 200Gb worth of state in RocksDB, the largest topic is 38 Gb.
> >
> > I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to cover the
> > initialization time,
> > but that does not seem nearly enough, I'm looking at more than two hour
> > startup times, and
> > that starts to be a bit ridiculous.
> >
> > Any tips / experiences on how to deal with this case? Move away from
> Rocks
> > and use an external
> > data store? Any tuning tips on how to tune Rocks to be a bit more useful
> > here?
> >
> > regards, Frank
> >
>


Re: Kafka producer dropping records

2016-11-25 Thread Ismael Juma
Hi Varun,

You could increase `retries`, but seems like you already configured it to
be `100`. Another option is to increase `retry.backoff.ms` which will
increase the time between retries.

Ismael

On Fri, Nov 25, 2016 at 9:38 AM, Phadnis, Varun 
wrote:

> Hello,
>
> Sorry for the late response, we tried logging the errors received in the
> callback and the result is that we are facing TimeoutExceptions
>
> org.apache.kafka.common.errors.TimeoutException: Batch containing
> 93 record(s) expired due to timeout while requesting metadata from brokers
> for mp_test2-1
>
> Increasing the request.timeout.ms=10 (from default of 3) fixed
> the messages from being dropped. However that seems like solution which
> would not scale if there was a unpredictable "burst" of slowness in network
> causing longer delay.
>
> Is there a better way to handle this? Is there any other producer/broker
> configuration I could tweak to increase the reliability of the producer?
>
> Thanks,
> Varun
>
> -Original Message-
> From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael
> Juma
> Sent: 22 November 2016 08:31
> To: Kafka Users 
> Subject: Re: Kafka producer dropping records
>
> Another option which is probably easier is to pass a callback to `send`
> and log errors.
>
> Ismael
>
> On Tue, Nov 22, 2016 at 10:33 AM, Ismael Juma  wrote:
>
> > You can collect the Futures and call `get` in batches. That would give
> > you access to the errors without blocking on each request.
> >
> > Ismael
> >
> >
> > On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun
> > 
> > wrote:
> >
> >> Hello,
> >>
> >> We had tried that... If future.get() is added in the while loop, it
> >> takes too long for the loop to execute.
> >>
> >> Last time we tried it, it was running for that file for over 2 hours
> >> and still not finished.
> >>
> >> Regards,
> >> Varun
> >>
> >> -Original Message-
> >> From: Jaikiran Pai [mailto:jai.forums2...@gmail.com]
> >> Sent: 22 November 2016 02:20
> >> To: users@kafka.apache.org
> >> Subject: Re: Kafka producer dropping records
> >>
> >> The KafkaProducer.send returns a Future. What happens
> >> when you add a future.get() on the returned Future, in that while
> >> loop, for each sent record?
> >>
> >> -Jaikiran
> >>
> >> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
> >> > Hello,
> >> >
> >> > We have the following piece of code where we read lines from a file
> >> > and
> >> push them to a Kafka topic :
> >> >
> >> >  Properties properties = new Properties();
> >> >  properties.put("bootstrap.servers", );
> >> >  properties.put("key.serializer",
> >> StringSerializer.class.getCanonicalName());
> >> >  properties.put("value.serializer",
> >> StringSerializer.class.getCanonicalName());
> >> >  properties.put("retries",100);
> >> > properties.put("acks", "all");
> >> >
> >> > KafkaProducer producer =  new
> >> > KafkaProducer<>(properties);
> >> >
> >> >  try (BufferedReader bf = new BufferedReader(new
> >> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
> >> >  String line;
> >> >  int count = 0;
> >> >  while ((line = bf.readLine()) != null) {
> >> >  count++;
> >> >  producer.send(new ProducerRecord<>(topicName, line));
> >> >  }
> >> >  Logger.log("Done producing data messages. Total no of
> >> records produced:" + count);
> >> >  } catch (InterruptedException | ExecutionException |
> >> IOException e) {
> >> >  Throwables.propagate(e);
> >> >  } finally {
> >> >  producer.close();
> >> >  }
> >> >
> >> > When we try this with a large file with a million records, only
> >> > half of
> >> them around 500,000 get written to the topic. In the above example, I
> >> verified this by running the GetOffset tool after fair amount of time
> >> (to ensure all records had finished processing) as follows:
> >> >
> >> >
> >> >  ./kafka-run-class.sh kafka.tools.GetOffsetShell
> >> > --broker-list  --time -1 --topic 
> >> >
> >> >
> >> >
> >> >
> >> > The output of this was :
> >> >
> >> >
> >> >  topic_name:1:292954
> >> >
> >> >  topic_name:0:296787
> >> >
> >> >
> >> > What could be causing this dropping of records?
> >> >
> >> > Thanks,
> >> > Varun
> >> >
> >>
> >>
> >
>


Re: Initializing StateStores takes *really* long for large datasets

2016-11-25 Thread Damian Guy
Hi Frank,

Is this on a restart of the application?

Thanks,
Damian

On Fri, 25 Nov 2016 at 11:09 Frank Lyaruu  wrote:

> Hi y'all,
>
> I have a reasonably simple KafkaStream application, which merges about 20
> topics a few times.
> The thing is, some of those topic datasets are pretty big, about 10M
> messages. In total I've got
> about 200Gb worth of state in RocksDB, the largest topic is 38 Gb.
>
> I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to cover the
> initialization time,
> but that does not seem nearly enough, I'm looking at more than two hour
> startup times, and
> that starts to be a bit ridiculous.
>
> Any tips / experiences on how to deal with this case? Move away from Rocks
> and use an external
> data store? Any tuning tips on how to tune Rocks to be a bit more useful
> here?
>
> regards, Frank
>


Re: Messages intermittently get lost

2016-11-25 Thread Zac Harvey
Hi Martin,


My server.properties looks like this:


listeners=PLAINTEXT://0.0.0.0:9092

advertised.host.name=

broker.id=2

port=9092

num.partitions=4

zookeeper.connect=zkA:2181,zkB:2181,zkC:2181

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

log.dirs=/tmp/kafka-logs

num.recovery.threads.per.data.dir=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=30

zookeeper.connection.timeout.ms=6000

delete.topic.enable=true

auto.leader.rebalance.enable=true


Above, 'zkA', 'zkB' and 'zkC' are defined in /etc/hosts and are valid ZK 
servers, and  is the public DNS of the EC2 (AWS) node that 
this Kafka is running on.


Anything look incorrect to you?


And yes, yesterday was a holiday, but there is only work! I'll celebrate one 
big, long holiday when I'm dead!


Thanks for any-and-all input here!


Best,

Zac



From: Martin Gainty 
Sent: Thursday, November 24, 2016 9:03:33 AM
To: users@kafka.apache.org
Subject: Re: Messages intermittently get lost

Hi Zach


there is a rumour that today thursday is a holiday?

in server.properties how are you configuring your server?


specifically what are these attributes?


num.network.threads=


num.io.threads=


socket.send.buffer.bytes=


socket.receive.buffer.bytes=


socket.request.max.bytes=


num.partitions=


num.recovery.threads.per.data.dir=


?

Martin
__




From: Zac Harvey 
Sent: Thursday, November 24, 2016 7:05 AM
To: users@kafka.apache.org
Subject: Re: Messages intermittently get lost

Anybody?!? This is very disconcerting!


From: Zac Harvey 
Sent: Wednesday, November 23, 2016 5:07:45 AM
To: users@kafka.apache.org
Subject: Messages intermittently get lost

I am playing around with Kafka and have a simple setup:


* 1-node Kafka (Ubuntu) server

* 3-node ZK cluster (each on their own Ubuntu server)


I have a consumer written in Scala and am using the kafka-console-producer 
(v0.10) that ships with the distribution.


I'd say about 20% of the messages I send via the producer never get consumed by 
the Scala process (which is running continuously). No errors on either side 
(producer or consumer): the producer sends, and, nothing...


Any ideas as to what might be going on here, or how I could start 
troubleshooting?


Thanks!


Initializing StateStores takes *really* long for large datasets

2016-11-25 Thread Frank Lyaruu
Hi y'all,

I have a reasonably simple KafkaStream application, which merges about 20
topics a few times.
The thing is, some of those topic datasets are pretty big, about 10M
messages. In total I've got
about 200Gb worth of state in RocksDB, the largest topic is 38 Gb.

I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to cover the
initialization time,
but that does not seem nearly enough, I'm looking at more than two hour
startup times, and
that starts to be a bit ridiculous.

Any tips / experiences on how to deal with this case? Move away from Rocks
and use an external
data store? Any tuning tips on how to tune Rocks to be a bit more useful
here?

regards, Frank


Re: KafkaStreams KTable#through not creating changelog topic

2016-11-25 Thread Mikael Högqvist
Thanks, based on this we will re-evaluate the use of internal topics. The
main motivation for using the internal changelog topics was to avoid
duplication of data and have an easy way to access the update stream of any
state store.

Best,
Mikael

On Fri, Nov 25, 2016 at 9:52 AM Michael Noll  wrote:

> Mikael,
>
> > Sure, I guess the topic is auto-created the first time I start the
> topology
> > and the second time its there already. It could be possible to create
> > topics up front for us, or even use an admin call from inside the code.
>
> Yes, that (i.e. you are running with auto-topic creation enabled) was what
> I implicitly understood.  As covered in [1] we strongly recommend to
> manually pre-create/manage user topics though.  User topics include the
> source topics that you are reading from (cf. `stream()`, `table()`) but
> also include the topics you use in `through()` and `to()`.
>
>
> > Today there is .through(topic, store) and
> > .to(topic), maybe it would be possible to have something like
> > .materialize(store) which takes care of topic creation? Would adding
> > something like this require a KIP?
>
> There is already work being done in the Admin API (KIP-4), and part of this
> functionality was released in the latest Kafka versions.  You can use this
> to programmatically create topics, for example.  Note though that the work
> on KIP-4 is not fully completed yet.
>
> -Michael
>
>
>
>
> [1]
>
> http://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application
>
>
>
> On Thu, Nov 24, 2016 at 3:55 PM, Mikael Högqvist 
> wrote:
>
> > Sure, I guess the topic is auto-created the first time I start the
> topology
> > and the second time its there already. It could be possible to create
> > topics up front for us, or even use an admin call from inside the code.
> >
> > That said, as a user, I think it would be great with a function in the
> > Kafka Streams DSL that would allow me to materialize a KTable without
> > pre-creating the topic. Today there is .through(topic, store) and
> > .to(topic), maybe it would be possible to have something like
> > .materialize(store) which takes care of topic creation? Would adding
> > something like this require a KIP?
> >
> > Best,
> > Mikael
> >
> > On Thu, Nov 24, 2016 at 1:44 PM Damian Guy  wrote:
> >
> > Mikeal,
> >
> > When you use `through(..)` topics are not created by KafkaStreams. You
> need
> > to create them yourself before you run the application.
> >
> > Thanks,
> > Damian
> >
> > On Thu, 24 Nov 2016 at 11:27 Mikael Högqvist 
> wrote:
> >
> > > Yes, the naming is not an issue.
> > >
> > > I've tested this with the topology described earlier. Every time I
> start
> > > the topology with a call to .through() that references a topic that
> does
> > > not exist, I get an exception from the UncaughtExceptionHandler:
> > >
> > > Uncaught exception org.apache.kafka.streams.errors.StreamsException:
> > Topic
> > > not found during partition assignment: words-count-changelog
> > >
> > > This happens when .through("words-count-changelog", "count") is part of
> > the
> > > topology. The topology is also not forwarding anything to that
> > topic/store.
> > > After restarting the application it works fine.
> > >
> > > Are the changelog topics created via, for example, .aggregate()
> different
> > > to topics auto created via .through()?
> > >
> > > Thanks,
> > > Mikael
> > >
> > > On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax  >
> > > wrote:
> > >
> > > > > 1) Create a state store AND the changelog
> > > > > topic 2) follow the Kafka Streams naming convention for changelog
> > > topics.
> > > > > Basically, I want to have a method that does what .through() is
> > > supposed
> > > > to
> > > > > do according to the documentation, but without the "topic"
> parameter.
> > > >
> > > > I understand what you are saying, but you can get this done right
> now,
> > > > too. If you use through(...) you will get the store. And you can just
> > > > specify the topic name as "applicationId-storeName-changelog" to
> > follow
> > > > the naming convention Streams used internally. What is the problem
> > using
> > > > this approach (besides that you have to provide the topic name which
> > > > seems not to be a big burden to me?)
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 11/23/16 8:59 AM, Mikael Högqvist wrote:
> > > > > Hi Michael,
> > > > >
> > > > > thanks for the extensive explanation, and yes it definitely helps
> > with
> > > my
> > > > > understanding of through(). :)
> > > > >
> > > > > You guessed correctly that I'm doing some "shenanings" where I'm
> > trying
> > > > to
> > > > > derive the changelog of a state store from the state store name.
> This
> > > > works
> > > > > perfectly fine with with a naming convention for the topics and by
> > > > creating
> > > > > them in Kafka upfront.
> > 

Re: Data (re)processing with Kafka (new wiki page)

2016-11-25 Thread saiprasad mishra
This page is really helpful.Thanks for putting this

Some nice to have features can be (not sure for this wiki page)

1) Pause and resume without having to start and stop.
It should drain all the inflight calculations before doing the actual pause
and a notifier will be helpful that it is actually paused.
This can be much light wait if possible rather than stopping all tasks and
stores and starting them again which might take a lot of time


2) Metadata api response if it can have all the topology graphs and sub
graphs with all the nodes and edges for each sub graphs with corresponding
state store names then it will be easier to build some UI on top of it and
also the pipe between the sub graphs which are kafka topics need to be
called out and also the time semantics can be laid out on top of it. This
is something like a logical view on top of the physical view which the
current api has.


Regards
Sai

On Fri, Nov 25, 2016 at 12:53 AM, Michael Noll  wrote:

> Thanks a lot, Matthias!
>
> I have already begun to provide feedback.
>
> -Michael
>
>
>
> On Wed, Nov 23, 2016 at 11:41 PM, Matthias J. Sax 
> wrote:
>
> > Hi,
> >
> > we added a new wiki page that is supposed to collect data (re)processing
> > scenario with Kafka:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams+Data+(Re)
> > Processing+Scenarios
> >
> > We added already a couple of scenarios we think might be common and want
> > to invite all of you to add more. This helps to get a better overview of
> > requirements to enable new use cases.
> >
> > We are looking forward to your feedback!
> >
> >
> > -Matthias
> >
> >
>


Graceful shutdown on Windows when using procrun

2016-11-25 Thread Harald Kirsch

Hi all,

we are using apache-daemon (aka procrun) to run the Kafka broker as a 
Windows service. This does not create a process with 'kafka.Kafka' in 
the name such that bin\windows\kafka-server-stop.bat does not work.


Instead we use stop-service to shut down the service (=procrun=Kafka), 
but this seems to leave a lot of segments in a state needing recovery 
(as opposed to what should be possible according to 
http://kafka.apache.org/documentation#basic_ops_restarting)


Does anyone have a recommendation of how to gracefully shutt down the 
Kafka-in-procrun setup?


Thanks
Harald


RE: Kafka producer dropping records

2016-11-25 Thread Phadnis, Varun
Hello,

Sorry for the late response, we tried logging the errors received in the 
callback and the result is that we are facing TimeoutExceptions

org.apache.kafka.common.errors.TimeoutException: Batch containing 93 
record(s) expired due to timeout while requesting metadata from brokers for 
mp_test2-1

Increasing the request.timeout.ms=10 (from default of 3) fixed the 
messages from being dropped. However that seems like solution which would not 
scale if there was a unpredictable "burst" of slowness in network causing 
longer delay. 

Is there a better way to handle this? Is there any other producer/broker 
configuration I could tweak to increase the reliability of the producer? 

Thanks,
Varun

-Original Message-
From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael Juma
Sent: 22 November 2016 08:31
To: Kafka Users 
Subject: Re: Kafka producer dropping records

Another option which is probably easier is to pass a callback to `send` and log 
errors.

Ismael

On Tue, Nov 22, 2016 at 10:33 AM, Ismael Juma  wrote:

> You can collect the Futures and call `get` in batches. That would give 
> you access to the errors without blocking on each request.
>
> Ismael
>
>
> On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun 
> 
> wrote:
>
>> Hello,
>>
>> We had tried that... If future.get() is added in the while loop, it 
>> takes too long for the loop to execute.
>>
>> Last time we tried it, it was running for that file for over 2 hours 
>> and still not finished.
>>
>> Regards,
>> Varun
>>
>> -Original Message-
>> From: Jaikiran Pai [mailto:jai.forums2...@gmail.com]
>> Sent: 22 November 2016 02:20
>> To: users@kafka.apache.org
>> Subject: Re: Kafka producer dropping records
>>
>> The KafkaProducer.send returns a Future. What happens 
>> when you add a future.get() on the returned Future, in that while 
>> loop, for each sent record?
>>
>> -Jaikiran
>>
>> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
>> > Hello,
>> >
>> > We have the following piece of code where we read lines from a file 
>> > and
>> push them to a Kafka topic :
>> >
>> >  Properties properties = new Properties();
>> >  properties.put("bootstrap.servers", );
>> >  properties.put("key.serializer",
>> StringSerializer.class.getCanonicalName());
>> >  properties.put("value.serializer",
>> StringSerializer.class.getCanonicalName());
>> >  properties.put("retries",100);
>> > properties.put("acks", "all");
>> >
>> > KafkaProducer producer =  new 
>> > KafkaProducer<>(properties);
>> >
>> >  try (BufferedReader bf = new BufferedReader(new
>> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
>> >  String line;
>> >  int count = 0;
>> >  while ((line = bf.readLine()) != null) {
>> >  count++;
>> >  producer.send(new ProducerRecord<>(topicName, line));
>> >  }
>> >  Logger.log("Done producing data messages. Total no of
>> records produced:" + count);
>> >  } catch (InterruptedException | ExecutionException |
>> IOException e) {
>> >  Throwables.propagate(e);
>> >  } finally {
>> >  producer.close();
>> >  }
>> >
>> > When we try this with a large file with a million records, only 
>> > half of
>> them around 500,000 get written to the topic. In the above example, I 
>> verified this by running the GetOffset tool after fair amount of time 
>> (to ensure all records had finished processing) as follows:
>> >
>> >
>> >  ./kafka-run-class.sh kafka.tools.GetOffsetShell 
>> > --broker-list  --time -1 --topic 
>> >
>> >
>> >
>> >
>> > The output of this was :
>> >
>> >
>> >  topic_name:1:292954
>> >
>> >  topic_name:0:296787
>> >
>> >
>> > What could be causing this dropping of records?
>> >
>> > Thanks,
>> > Varun
>> >
>>
>>
>


Re: Data (re)processing with Kafka (new wiki page)

2016-11-25 Thread Michael Noll
Thanks a lot, Matthias!

I have already begun to provide feedback.

-Michael



On Wed, Nov 23, 2016 at 11:41 PM, Matthias J. Sax 
wrote:

> Hi,
>
> we added a new wiki page that is supposed to collect data (re)processing
> scenario with Kafka:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Data+(Re)
> Processing+Scenarios
>
> We added already a couple of scenarios we think might be common and want
> to invite all of you to add more. This helps to get a better overview of
> requirements to enable new use cases.
>
> We are looking forward to your feedback!
>
>
> -Matthias
>
>


kafka balance partition data across directory locations after partition created

2016-11-25 Thread Yuanjia
Hi all,
In kafka cluster, we config multi directory using "log.dirs" property.  Kafka 
balances the partition data directories across these given directory locations 
when to create new topic.
With in increase in data size, some disks may be full while the other not. So 
we need to move some partition data to different location.
Does kafka has balancer to do that automatically, or some utils can help do 
that?

Thanks,
Regards,



Yuanjia Li


Re: KafkaStreams KTable#through not creating changelog topic

2016-11-25 Thread Michael Noll
Mikael,

> Sure, I guess the topic is auto-created the first time I start the
topology
> and the second time its there already. It could be possible to create
> topics up front for us, or even use an admin call from inside the code.

Yes, that (i.e. you are running with auto-topic creation enabled) was what
I implicitly understood.  As covered in [1] we strongly recommend to
manually pre-create/manage user topics though.  User topics include the
source topics that you are reading from (cf. `stream()`, `table()`) but
also include the topics you use in `through()` and `to()`.


> Today there is .through(topic, store) and
> .to(topic), maybe it would be possible to have something like
> .materialize(store) which takes care of topic creation? Would adding
> something like this require a KIP?

There is already work being done in the Admin API (KIP-4), and part of this
functionality was released in the latest Kafka versions.  You can use this
to programmatically create topics, for example.  Note though that the work
on KIP-4 is not fully completed yet.

-Michael




[1]
http://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application



On Thu, Nov 24, 2016 at 3:55 PM, Mikael Högqvist 
wrote:

> Sure, I guess the topic is auto-created the first time I start the topology
> and the second time its there already. It could be possible to create
> topics up front for us, or even use an admin call from inside the code.
>
> That said, as a user, I think it would be great with a function in the
> Kafka Streams DSL that would allow me to materialize a KTable without
> pre-creating the topic. Today there is .through(topic, store) and
> .to(topic), maybe it would be possible to have something like
> .materialize(store) which takes care of topic creation? Would adding
> something like this require a KIP?
>
> Best,
> Mikael
>
> On Thu, Nov 24, 2016 at 1:44 PM Damian Guy  wrote:
>
> Mikeal,
>
> When you use `through(..)` topics are not created by KafkaStreams. You need
> to create them yourself before you run the application.
>
> Thanks,
> Damian
>
> On Thu, 24 Nov 2016 at 11:27 Mikael Högqvist  wrote:
>
> > Yes, the naming is not an issue.
> >
> > I've tested this with the topology described earlier. Every time I start
> > the topology with a call to .through() that references a topic that does
> > not exist, I get an exception from the UncaughtExceptionHandler:
> >
> > Uncaught exception org.apache.kafka.streams.errors.StreamsException:
> Topic
> > not found during partition assignment: words-count-changelog
> >
> > This happens when .through("words-count-changelog", "count") is part of
> the
> > topology. The topology is also not forwarding anything to that
> topic/store.
> > After restarting the application it works fine.
> >
> > Are the changelog topics created via, for example, .aggregate() different
> > to topics auto created via .through()?
> >
> > Thanks,
> > Mikael
> >
> > On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax 
> > wrote:
> >
> > > > 1) Create a state store AND the changelog
> > > > topic 2) follow the Kafka Streams naming convention for changelog
> > topics.
> > > > Basically, I want to have a method that does what .through() is
> > supposed
> > > to
> > > > do according to the documentation, but without the "topic" parameter.
> > >
> > > I understand what you are saying, but you can get this done right now,
> > > too. If you use through(...) you will get the store. And you can just
> > > specify the topic name as "applicationId-storeName-changelog" to
> follow
> > > the naming convention Streams used internally. What is the problem
> using
> > > this approach (besides that you have to provide the topic name which
> > > seems not to be a big burden to me?)
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 11/23/16 8:59 AM, Mikael Högqvist wrote:
> > > > Hi Michael,
> > > >
> > > > thanks for the extensive explanation, and yes it definitely helps
> with
> > my
> > > > understanding of through(). :)
> > > >
> > > > You guessed correctly that I'm doing some "shenanings" where I'm
> trying
> > > to
> > > > derive the changelog of a state store from the state store name. This
> > > works
> > > > perfectly fine with with a naming convention for the topics and by
> > > creating
> > > > them in Kafka upfront.
> > > >
> > > > My point is that it would help me (and maybe others), if the API of
> > > KTable
> > > > was extended to have a new method that does two things that is not
> part
> > > of
> > > > the implementation of .through(). 1) Create a state store AND the
> > > changelog
> > > > topic 2) follow the Kafka Streams naming convention for changelog
> > topics.
> > > > Basically, I want to have a method that does what .through() is
> > supposed
> > > to
> > > > do according to the documentation, but without the "topic" parameter.
> > > >
> > > > What do you think, would it be