about java.io.EOFException / java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

2017-03-17 Thread Selina Tech
Hi:
I am processing on a new Kafka topic with Spark and then I got error
below. I google this questions, looks like I lot of people having similar
problems before. But I have not got clue yet.

   Is any one know how to fix this issue?

Sincerely.
Selina


00:39:58,004 WARN  - 2017-03-18
00:39:57,921:7726(0x7f22e69b8700):ZOO_WARN@zookeeper_interest@1557:
Exceeded deadline by 28ms
00:41:01,298 WARN  - 17/03/18 00:41:01 WARN Selector: Error in I/O with /
10.128.64.152
00:41:01,298 WARN  - java.io.EOFException
00:41:01,298 WARN  - at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
00:41:01,298 WARN  - at
org.apache.kafka.common.network.Selector.poll(Selector.java:248)
00:41:01,298 WARN  - at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
00:41:01,298 WARN  - at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
00:41:01,298 WARN  - at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
00:41:01,298 WARN  - at java.lang.Thread.run(Thread.java:745)
00:43:31,514 WARN  - 2017-03-18
00:43:31,514:7726(0x7f22e69b8700):ZOO_WARN@zookeeper_interest@1557:
Exceeded deadline by 11ms
00:44:17,996 WARN  - 17/03/18 00:44:17 WARN ThrowableSerializationWrapper:
Task exception could not be deserialized
00:44:17,996 WARN  - java.lang.ClassNotFoundException:
kafka.common.OffsetOutOfRangeException
00:44:17,996 WARN  - at
java.net.URLClassLoader$1.run(URLClassLoader.java:366)
00:44:17,996 WARN  - at
java.net.URLClassLoader$1.run(URLClassLoader.java:355)
00:44:17,996 WARN  - at
java.security.AccessController.doPrivileged(Native Method)
00:44:17,996 WARN  - at
java.net.URLClassLoader.findClass(URLClassLoader.java:354)
00:44:17,996 WARN  - at
java.lang.ClassLoader.loadClass(ClassLoader.java:425)
00:44:17,996 WARN  - at
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
00:44:17,996 WARN  - at
java.lang.ClassLoader.loadClass(ClassLoader.java:358)
00:44:17,996 WARN  - at java.lang.Class.forName0(Native Method)
00:44:17,996 WARN  - at java.lang.Class.forName(Class.java:278)
00:44:17,996 WARN  - at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
00:44:17,996 WARN  - at
org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
00:44:17,996 WARN  - at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
00:44:17,996 WARN  - at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
00:44:17,996 WARN  - at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
00:44:17,996 WARN  - at java.lang.reflect.Method.invoke(Method.java:606)
00:44:17,996 WARN  - at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
00:44:17,996 WARN  - at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
00:44:17,996 WARN  - at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
00:44:17,996 WARN  - at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
00:44:17,996 WARN  - at
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
00:44:17,996 WARN  - at
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run

Re: Capacity planning for Kafka Streams

2017-03-17 Thread Mahendra Kariya
Thanks for the heads up Guozhang!

The problem is our brokers are on 0.10.0.x. So we will have to upgrade them.

On Sat, Mar 18, 2017 at 12:30 AM, Guozhang Wang  wrote:

> Hi Mahendra,
>
> Just a kind reminder that upgrading Streams to 0.10.2 does not necessarily
> require you to upgrade brokers to 0.10.2 as well. Since we have added a new
> feature since 0.10.2 to allow newer versioned clients (producer, consumer,
> streams) to talk to older versioned brokers, and for Streams specifically
> it only requires brokers to be no older than 0.10.1.
>
>
> Guozhang
>
>
> On Mon, Mar 13, 2017 at 5:12 AM, Mahendra Kariya <
> mahendra.kar...@go-jek.com
> > wrote:
>
> > We are planning to migrate to the newer version of Kafka. But that's a
> few
> > weeks away.
> >
> > We will try setting the socket config and see how it turns out.
> >
> > Thanks a lot for your response!
> >
> >
> >
> > On Mon, Mar 13, 2017 at 3:21 PM, Eno Thereska 
> > wrote:
> >
> > > Thanks,
> > >
> > > A couple of things:
> > > - I’d recommend moving to 0.10.2 (latest release) if you can since
> > several
> > > improvements were made in the last two releases that make rebalancing
> and
> > > performance better.
> > >
> > > - When running on environments with large latency on AWS at least
> > (haven’t
> > > tried Google cloud), one parameter we have found useful to increase
> > > performance is the receive and send socket size for the consumer and
> > > producer in streams. We’d recommend setting them to 1MB like this
> (where
> > > “props” is your own properties object when you start streams):
> > >
> > > // the socket buffer needs to be large, especially when running in AWS
> > with
> > > // high latency. if running locally the default is fine.
> > > props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024);
> > > props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
> > >
> > > Make sure the OS allows the larger socket size too.
> > >
> > > Thanks
> > > Eno
> > >
> > > > On Mar 13, 2017, at 9:21 AM, Mahendra Kariya <
> > mahendra.kar...@go-jek.com>
> > > wrote:
> > > >
> > > > Hi Eno,
> > > >
> > > > Please find my answers inline.
> > > >
> > > >
> > > > We are in the process of documenting capacity planning for streams,
> > stay
> > > tuned.
> > > >
> > > > This would be great! Looking forward to it.
> > > >
> > > > Could you send some more info on your problem? What Kafka version are
> > > you using?
> > > >
> > > > We are using Kafka 0.10.0.0.
> > > >
> > > > Are the VMs on the same or different hosts?
> > > >
> > > > The VMs are on Google Cloud. Two of them are in asia-east1-a and one
> is
> > > in asia-east1-c. All three are n1-standard-4 Ubuntu instances.
> > > >
> > > > Also what exactly do you mean by “the lag keeps fluctuating”, what
> > > metric are you looking at?
> > > >
> > > > We are looking at Kafka Manager for the time being. By fluctuating, I
> > > mean the lag is few thousands at one time, we refresh it the next
> second,
> > > it is in few lakhs, and again refresh it and it is few thousands. I
> > > understand this may not be very accurate. We will soon have more
> accurate
> > > data once we start pushing the consumer lag metric to Datadog.
> > > >
> > > > But on a separate note, the difference between lags on different
> > > partitions is way too high. I have attached a tab separated file
> herewith
> > > which shows the consumer lag (from Kafka Manager) for the first the 50
> > > partitions. As is clear, the lag on partition 2 is 530 while the lag on
> > > partition 18 is 23K. Note that the same VM is pulling data from both
> the
> > > partitions.
> > > >
> > > >
> > > >
> > > >
> > > > 
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: kafka-topics[.sh]: fail to support connecting via broker / v0.10 style

2017-03-17 Thread Hans Jespersen
I can be updated once the Kafka AdminAPI is available and does everything
over the Kafka wire protocol that the current kafka-topics command does by
talking directly with zookeeper. For example create a topic or delete a
topic. Unfortunately is has to remain this way for just a little while
longer.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Fri, Mar 17, 2017 at 1:20 PM, Andrew Pennebaker <
andrew.penneba...@gmail.com> wrote:

> If I understand Kafka correctly, since v0.9 / v0.10, users are often
> recommended to connect consumers to the Kafka cluster via bootstrap.servers
> AKA broker node addresses.
>
> However, the kafka-topics shell scripts fails to support this interface,
> still requiring the legacy zookeeper connect string.
>
> Could this be updated?
>
> --
> Cheers,
> Andrew
>


Re: Streams RocksDBException with no message?

2017-03-17 Thread Guozhang Wang
Hi Mathieu,

We are aware of that since long time ago and I have been looking into this
issue, turns out to be a known issue in RocksDB:

https://github.com/facebook/rocksdb/issues/1688

And the corresponding fix (https://github.com/facebook/rocksdb/pull/1714)
has been merged in master but marked for

   - v5.1.4 

only while the latest release is 5.1.2.


Guozhang


On Fri, Mar 17, 2017 at 10:27 AM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hey all,
>
> So... what does it mean to have a RocksDBException with a message that just
> has a single character?  "e", "q", "]"... I've seen a few.  Has anyone seen
> this before?
>
> Two example exceptions:
> https://gist.github.com/mfenniak/c56beb6d5058e2b21df0309aea224f12
>
> Kafka Streams 0.10.2.0.  Both of these errors occurred during state store
> initialization.  I'm running a single Kafka Streams thread per server, this
> occurred on two servers about a half-hour apart.
>
> Mathieu
>



-- 
-- Guozhang


Re: Reg: Kafka HDFS Connector with (HDFS SSL enabled)

2017-03-17 Thread BigData dev
Hi Colin,
I have configured SSL in HDFS and used SWebHDFS.
I am able to make it work with Kafka HDFS Connector.


Thanks,
Bharat


On Fri, Feb 17, 2017 at 1:47 PM, Colin McCabe  wrote:

> Hi,
>
> Just to be clear, HDFS doesn't use HTTP or HTTPS as its primary
> transport mechanism.  Instead, HDFS uses the Hadoop RPC transport
> mechanism.  So in general, it should not be necessary to configure SSL
> to connect a client to HDFS.
>
> HDFS does "support SSL" in the sense that the HDFS web UI can be
> configured to use it.  You can also use HttpFS or WebHDFS to access
> HDFS, which might motivate you to configure SSL.  Are you trying to
> configure one of these?
>
> best,
> Colin
>
>
> On Wed, Feb 15, 2017, at 11:03, BigData dev wrote:
> > Hi,
> >
> > Does Kafka HDFS Connect work with HDFS (SSL). As I see only properties in
> > security is
> > hdfs.authentication.kerberos, connect.hdfs.keytab,hdfs.
> namenode.principal
> > as these properties are all related to HDFS Kerberos.
> >
> > As from the configuration and code I see we pass only Kerberos
> > parameters,
> > not seen SSL configuration, so want to confirm will the Kafka HDFS
> > Connector works with HDFS (SSL enabled)?
> >
> > Could you please provide any information on this.
> >
> >
> > Thanks
>


Re: Kafka Streams: lockException

2017-03-17 Thread Guozhang Wang
Tianji and Sachin (and also cc'ing people who I remember have reported
similar RocksDB memory issues),

Sharing my experience with RocksDB tuning and also chatting with the
RocksDB community:

1. If you are frequently flushing the state stores (e.g. with high commit
frequency) then you will end up with huge number of very small memtable
files, and hence result in very high compaction pressure on RocksDB; if you
use default number of compaction threads (1) it will not be able to catch
up with the write throughput and compaction rate, and hence the gradual
degradation of performance. We have changed the default
num.compaction.threads in trunk but if you are under released version
0.10.2 or older, check your store's flush rate metrics and consider
increasing the compaction threads.

2. The most common memory leaks from RocksDB JNI are iterator leaks. Make
sure to close the iterator return for your range queries / fetches from the
stores when you are done. If not the corresponding scanned memory will be
pinned in memory and cannot be compacted.


Guozhang


On Fri, Mar 17, 2017 at 8:56 AM, Eno Thereska 
wrote:

> Sachin, you also have a PR for this that could help, right?:
> https://github.com/apache/kafka/pull/2642#issuecomment-287372367 <
> https://github.com/apache/kafka/pull/2642#issuecomment-287372367>.
>
> Thanks
> Eno
>
>
> > On 17 Mar 2017, at 15:19, Sachin Mittal  wrote:
> >
> > We also face same issues.
> > What we have found is that rocksdb is the issue. With many instances of
> > rocksdb per machine, over the time it slows down due to i/o operations,
> > resulting in threads getting evicted because max.poll.interval exceeds
> the
> > set limit.
> >
> > Try running rocksdb in memory https://github.com/facebook/
> > rocksdb/wiki/How-to-persist-in-memory-RocksDB-database%3F.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Fri, Mar 17, 2017 at 8:34 PM, Tianji Li  wrote:
> >
> >> Hi Eno,
> >>
> >> I used 150, 50, 20 threads and the probabilities of crashing decreased
> with
> >> this number. When using 1 thread, no crash!
> >>
> >> My max.poll.interval is 5 minutes and all the processing won't last that
> >> long, so that parameter does not help.
> >>
> >>
> >> Thanks
> >> Tianji
> >>
> >> On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska 
> >> wrote:
> >>
> >>> Hi Tianji,
> >>>
> >>> How many threads does your app use?
> >>>
> >>> One reason is explained here: https://groups.google.com/
> >>> forum/#!topic/confluent-platform/wgCSuwIJo5g <
> https://groups.google.com/
> >>> forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to
> >> increase
> >>> max.poll.interval config value.
> >>> If that doesn't work, could you revert to using one thread for now.
> Also
> >>> let us know either way since we might need to open a bug report.
> >>>
> >>> Thanks
> >>> Eno
> >>>
>  On 16 Mar 2017, at 20:47, Tianji Li  wrote:
> 
>  Hi there,
> 
>  I always got this crashes and wonder if anyone knows why. Please let
> me
>  know what information I should provide to help with trouble shooting.
> 
>  I am using 0.10.2.0. My application is reading one topic and then
>  groupBy().aggregate() 50 times on different keys.
> 
>  I use memory store, without backing to kafka.
> 
>  Thanks
>  Tianji
> 
> 
>  2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
>  o.a.k.s.p.internals.StreamThread : Could not create task 0_4.
> >>> Will
>  retry.
> 
>  org.apache.kafka.streams.errors.LockException: task [0_4] Failed to
> >> lock
>  the state directory: /tmp/kafka-streams/xxx-test28/0_4
>    at
>  org.apache.kafka.streams.processor.internals.
> >>> ProcessorStateManager.(ProcessorStateManager.java:102)
>    at
>  org.apache.kafka.streams.processor.internals.AbstractTask.(
> >>> AbstractTask.java:73)
>    at
>  org.apache.kafka.streams.processor.internals.
> >>> StreamTask.(StreamTask.java:108)
>    at
>  org.apache.kafka.streams.processor.internals.
> >>> StreamThread.createStreamTask(StreamThread.java:834)
>    at
>  org.apache.kafka.streams.processor.internals.StreamThread$Ta
> skCreator.
> >>> createTask(StreamThread.java:1207)
>    at
>  org.apache.kafka.streams.processor.internals.StreamThread$
> >>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>    at
>  org.apache.kafka.streams.processor.internals.
> >>> StreamThread.addStreamTasks(StreamThread.java:937)
>    at
>  org.apache.kafka.streams.processor.internals.StreamThread.access$500(
> >>> StreamThread.java:69)
>    at
>  org.apache.kafka.streams.processor.internals.StreamThread$1.
> >>> onPartitionsAssigned(StreamThread.java:236)
>    at
>  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> >>> onJoinComplete(ConsumerCoordinator.java:255)
>    at
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>

kafka-topics[.sh]: fail to support connecting via broker / v0.10 style

2017-03-17 Thread Andrew Pennebaker
If I understand Kafka correctly, since v0.9 / v0.10, users are often
recommended to connect consumers to the Kafka cluster via bootstrap.servers
AKA broker node addresses.

However, the kafka-topics shell scripts fails to support this interface,
still requiring the legacy zookeeper connect string.

Could this be updated?

-- 
Cheers,
Andrew


Re: Kafka Streams: ReadOnlyKeyValueStore range behavior

2017-03-17 Thread Damian Guy
Thanks Dmitry. Please do create a JIRA for the range scan.
On Fri, 17 Mar 2017 at 18:01, Dmitry Minkovsky  wrote:

> Regarding the null bug: I had time to open a JIRA today. Looks like an
> issue already exists: https://issues.apache.org/jira/browse/KAFKA-4750
>
> Regarding scan order: I would gladly produce a sample that replicates this
> behavior if you can confirm that you will perceive this as a defect. I
> would really love to be able to do ordered prefixed range scans with
> interactive queries. But if you don't think the lack of this facility is a
> defect then I can't spend more time on this.
>
> Thank you!
>
> On Fri, Mar 17, 2017 at 1:18 PM, Dmitry Minkovsky 
> wrote:
>
> > Ah! Yes. Thank you! That make sense.
> >
> > Anyway, I _think_ that's not what I was doing given that all items were
> > being routed to and then read from a partition identified by one key.
> >
> > On Fri, Mar 17, 2017 at 12:50 PM, Damian Guy 
> wrote:
> >
> >> > When you use Queryable State you are actually querying multiple
> >>
> >> > underlying stores, i.e., one per partition.
> >> >
> >> > Huh? I was only querying one partition. In my example, I have a user's
> >> > posts. Upon creation, they are routed to a particular partition using
> a
> >> > partitioner that hashes the post's user ID. The posts are then indexed
> >> on
> >> > that partition by prefixed keys using the method described above. When
> >> > querying, I am only querying the one partition that has all of the
> >> user's
> >> > posts. As far as I know, I am not querying across multiple partitions.
> >> > Furthermore, I did not even think this was possible, given the fact
> that
> >> > Interactive Queries require you to manually forward requests that
> >> should go
> >> > to other partitions.
> >> >
> >> >
> >> Each KafkaStreams instance is potentially responsible for multiple
> >> partitions, so when you use Queryable State on a particular instance you
> >> are querying all partitions for that store on the given instance.
> >>
> >>
> >>
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > On Thu, Mar 16, 2017 at 2:11 PM, Damian Guy 
> >> wrote:
> >> >
> >> > > I think what you are seeing is that the order is not guaranteed
> across
> >> > > partitions. When you use Queryable State you are actually querying
> >> > multiple
> >> > > underlying stores, i.e., one per partition. The implementation
> >> iterates
> >> > > over one store/partition at a time, so the ordering will appear
> >> random.
> >> > > This could be improved
> >> > >
> >> > > The tombstone records appearing in the results seems like a bug.
> >> > >
> >> > > Thanks,
> >> > > Damian
> >> > >
> >> > > On Thu, 16 Mar 2017 at 17:37 Matthias J. Sax  >
> >> > > wrote:
> >> > >
> >> > > > Can you check if the problem exist for 0.10.2, too? (0.10.2 is
> >> > > > compatible to 0.10.1 broker -- so you can upgrade your Streams
> code
> >> > > > independently from the brokers).
> >> > > >
> >> > > > About the range: I did double check this, and I guess my last
> answer
> >> > was
> >> > > > not correct, and range() should return ordered data, but I got a
> >> follow
> >> > > > up question: what the key type and serializer you use? Internally,
> >> data
> >> > > > is stored in serialized form and ordered according to
> >> > > > `LexicographicByteArrayComparator` -- thus, if the serialized
> bytes
> >> > > > don't reflect the order of the deserialized data, it returned
> range
> >> > > > shows up unordered to you.
> >> > > >
> >> > > >
> >> > > > -Matthias
> >> > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > On 3/16/17 10:14 AM, Dmitry Minkovsky wrote:
> >> > > > > Hi Matthias. Thank you for your response.
> >> > > > >
> >> > > > > Yes, I was able to reproduce the null issue reliably. I can't
> >> open a
> >> > > JIRA
> >> > > > > at this time, but I can say I was using 0.10.1.0 and it was
> >> trivial
> >> > to
> >> > > > > reproduce. Just send records and the tombstones to a table
> topic.
> >> > Then
> >> > > > scan
> >> > > > > the range. You'll see the tombstones.
> >> > > > >
> >> > > > > Indeed, ranges are returned with no specific order. I'm not sure
> >> what
> >> > > you
> >> > > > > mean that default stores are hash-based, but this ordering thing
> >> is a
> >> > > > shame
> >> > > > > because it kind of kills the ability to use KS as a full fledged
> >> DB
> >> > > that
> >> > > > > lets you index things like HBase (composite keys for lists of
> >> items).
> >> > > Is
> >> > > > > that how RocksDB works? Just returns range scans in random
> order?
> >> I
> >> > > don't
> >> > > > > know C++ so the documentation is a bit opaque to me. But what's
> >> the
> >> > > point
> >> > > > > of scanning a range if the data comes in some random order? That
> >> > being
> >> > > > the
> >> > > > > case, the number of possible use-case scenarios seem to become
> >> > > > > significantly limited.
> >> > > > >
> >> > > > >
> >> > > > > Thank you!
> >> > > > > Dmitry
> >> > > > >
> >> > > > > On Tue, Mar 14, 2017 at 1:12 P

Re: Capacity planning for Kafka Streams

2017-03-17 Thread Guozhang Wang
Hi Mahendra,

Just a kind reminder that upgrading Streams to 0.10.2 does not necessarily
require you to upgrade brokers to 0.10.2 as well. Since we have added a new
feature since 0.10.2 to allow newer versioned clients (producer, consumer,
streams) to talk to older versioned brokers, and for Streams specifically
it only requires brokers to be no older than 0.10.1.


Guozhang


On Mon, Mar 13, 2017 at 5:12 AM, Mahendra Kariya  wrote:

> We are planning to migrate to the newer version of Kafka. But that's a few
> weeks away.
>
> We will try setting the socket config and see how it turns out.
>
> Thanks a lot for your response!
>
>
>
> On Mon, Mar 13, 2017 at 3:21 PM, Eno Thereska 
> wrote:
>
> > Thanks,
> >
> > A couple of things:
> > - I’d recommend moving to 0.10.2 (latest release) if you can since
> several
> > improvements were made in the last two releases that make rebalancing and
> > performance better.
> >
> > - When running on environments with large latency on AWS at least
> (haven’t
> > tried Google cloud), one parameter we have found useful to increase
> > performance is the receive and send socket size for the consumer and
> > producer in streams. We’d recommend setting them to 1MB like this (where
> > “props” is your own properties object when you start streams):
> >
> > // the socket buffer needs to be large, especially when running in AWS
> with
> > // high latency. if running locally the default is fine.
> > props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024);
> > props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
> >
> > Make sure the OS allows the larger socket size too.
> >
> > Thanks
> > Eno
> >
> > > On Mar 13, 2017, at 9:21 AM, Mahendra Kariya <
> mahendra.kar...@go-jek.com>
> > wrote:
> > >
> > > Hi Eno,
> > >
> > > Please find my answers inline.
> > >
> > >
> > > We are in the process of documenting capacity planning for streams,
> stay
> > tuned.
> > >
> > > This would be great! Looking forward to it.
> > >
> > > Could you send some more info on your problem? What Kafka version are
> > you using?
> > >
> > > We are using Kafka 0.10.0.0.
> > >
> > > Are the VMs on the same or different hosts?
> > >
> > > The VMs are on Google Cloud. Two of them are in asia-east1-a and one is
> > in asia-east1-c. All three are n1-standard-4 Ubuntu instances.
> > >
> > > Also what exactly do you mean by “the lag keeps fluctuating”, what
> > metric are you looking at?
> > >
> > > We are looking at Kafka Manager for the time being. By fluctuating, I
> > mean the lag is few thousands at one time, we refresh it the next second,
> > it is in few lakhs, and again refresh it and it is few thousands. I
> > understand this may not be very accurate. We will soon have more accurate
> > data once we start pushing the consumer lag metric to Datadog.
> > >
> > > But on a separate note, the difference between lags on different
> > partitions is way too high. I have attached a tab separated file herewith
> > which shows the consumer lag (from Kafka Manager) for the first the 50
> > partitions. As is clear, the lag on partition 2 is 530 while the lag on
> > partition 18 is 23K. Note that the same VM is pulling data from both the
> > partitions.
> > >
> > >
> > >
> > >
> > > 
> >
> >
>



-- 
-- Guozhang


Re: Kafka Streams: ReadOnlyKeyValueStore range behavior

2017-03-17 Thread Dmitry Minkovsky
Regarding the null bug: I had time to open a JIRA today. Looks like an
issue already exists: https://issues.apache.org/jira/browse/KAFKA-4750

Regarding scan order: I would gladly produce a sample that replicates this
behavior if you can confirm that you will perceive this as a defect. I
would really love to be able to do ordered prefixed range scans with
interactive queries. But if you don't think the lack of this facility is a
defect then I can't spend more time on this.

Thank you!

On Fri, Mar 17, 2017 at 1:18 PM, Dmitry Minkovsky 
wrote:

> Ah! Yes. Thank you! That make sense.
>
> Anyway, I _think_ that's not what I was doing given that all items were
> being routed to and then read from a partition identified by one key.
>
> On Fri, Mar 17, 2017 at 12:50 PM, Damian Guy  wrote:
>
>> > When you use Queryable State you are actually querying multiple
>>
>> > underlying stores, i.e., one per partition.
>> >
>> > Huh? I was only querying one partition. In my example, I have a user's
>> > posts. Upon creation, they are routed to a particular partition using a
>> > partitioner that hashes the post's user ID. The posts are then indexed
>> on
>> > that partition by prefixed keys using the method described above. When
>> > querying, I am only querying the one partition that has all of the
>> user's
>> > posts. As far as I know, I am not querying across multiple partitions.
>> > Furthermore, I did not even think this was possible, given the fact that
>> > Interactive Queries require you to manually forward requests that
>> should go
>> > to other partitions.
>> >
>> >
>> Each KafkaStreams instance is potentially responsible for multiple
>> partitions, so when you use Queryable State on a particular instance you
>> are querying all partitions for that store on the given instance.
>>
>>
>>
>> >
>> >
>> >
>> >
>> >
>> > On Thu, Mar 16, 2017 at 2:11 PM, Damian Guy 
>> wrote:
>> >
>> > > I think what you are seeing is that the order is not guaranteed across
>> > > partitions. When you use Queryable State you are actually querying
>> > multiple
>> > > underlying stores, i.e., one per partition. The implementation
>> iterates
>> > > over one store/partition at a time, so the ordering will appear
>> random.
>> > > This could be improved
>> > >
>> > > The tombstone records appearing in the results seems like a bug.
>> > >
>> > > Thanks,
>> > > Damian
>> > >
>> > > On Thu, 16 Mar 2017 at 17:37 Matthias J. Sax 
>> > > wrote:
>> > >
>> > > > Can you check if the problem exist for 0.10.2, too? (0.10.2 is
>> > > > compatible to 0.10.1 broker -- so you can upgrade your Streams code
>> > > > independently from the brokers).
>> > > >
>> > > > About the range: I did double check this, and I guess my last answer
>> > was
>> > > > not correct, and range() should return ordered data, but I got a
>> follow
>> > > > up question: what the key type and serializer you use? Internally,
>> data
>> > > > is stored in serialized form and ordered according to
>> > > > `LexicographicByteArrayComparator` -- thus, if the serialized bytes
>> > > > don't reflect the order of the deserialized data, it returned range
>> > > > shows up unordered to you.
>> > > >
>> > > >
>> > > > -Matthias
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > On 3/16/17 10:14 AM, Dmitry Minkovsky wrote:
>> > > > > Hi Matthias. Thank you for your response.
>> > > > >
>> > > > > Yes, I was able to reproduce the null issue reliably. I can't
>> open a
>> > > JIRA
>> > > > > at this time, but I can say I was using 0.10.1.0 and it was
>> trivial
>> > to
>> > > > > reproduce. Just send records and the tombstones to a table topic.
>> > Then
>> > > > scan
>> > > > > the range. You'll see the tombstones.
>> > > > >
>> > > > > Indeed, ranges are returned with no specific order. I'm not sure
>> what
>> > > you
>> > > > > mean that default stores are hash-based, but this ordering thing
>> is a
>> > > > shame
>> > > > > because it kind of kills the ability to use KS as a full fledged
>> DB
>> > > that
>> > > > > lets you index things like HBase (composite keys for lists of
>> items).
>> > > Is
>> > > > > that how RocksDB works? Just returns range scans in random order?
>> I
>> > > don't
>> > > > > know C++ so the documentation is a bit opaque to me. But what's
>> the
>> > > point
>> > > > > of scanning a range if the data comes in some random order? That
>> > being
>> > > > the
>> > > > > case, the number of possible use-case scenarios seem to become
>> > > > > significantly limited.
>> > > > >
>> > > > >
>> > > > > Thank you!
>> > > > > Dmitry
>> > > > >
>> > > > > On Tue, Mar 14, 2017 at 1:12 PM, Matthias J. Sax <
>> > > matth...@confluent.io>
>> > > > > wrote:
>> > > > >
>> > > > >>> However,
>> > > >  for keys that have been tombstoned, it does return null for me.
>> > > > >>
>> > > > >> Sound like a bug. Can you reliable reproduce this? Would you mind
>> > > > >> opening a JIRA?
>> > > > >>
>> > > > >> Can you check if this happens for both cases: caching enabled and
>> > > > >> d

Streams RocksDBException with no message?

2017-03-17 Thread Mathieu Fenniak
Hey all,

So... what does it mean to have a RocksDBException with a message that just
has a single character?  "e", "q", "]"... I've seen a few.  Has anyone seen
this before?

Two example exceptions:
https://gist.github.com/mfenniak/c56beb6d5058e2b21df0309aea224f12

Kafka Streams 0.10.2.0.  Both of these errors occurred during state store
initialization.  I'm running a single Kafka Streams thread per server, this
occurred on two servers about a half-hour apart.

Mathieu


Re: Kafka Streams: ReadOnlyKeyValueStore range behavior

2017-03-17 Thread Dmitry Minkovsky
Ah! Yes. Thank you! That make sense.

Anyway, I _think_ that's not what I was doing given that all items were
being routed to and then read from a partition identified by one key.

On Fri, Mar 17, 2017 at 12:50 PM, Damian Guy  wrote:

> > When you use Queryable State you are actually querying multiple
>
> > underlying stores, i.e., one per partition.
> >
> > Huh? I was only querying one partition. In my example, I have a user's
> > posts. Upon creation, they are routed to a particular partition using a
> > partitioner that hashes the post's user ID. The posts are then indexed on
> > that partition by prefixed keys using the method described above. When
> > querying, I am only querying the one partition that has all of the user's
> > posts. As far as I know, I am not querying across multiple partitions.
> > Furthermore, I did not even think this was possible, given the fact that
> > Interactive Queries require you to manually forward requests that should
> go
> > to other partitions.
> >
> >
> Each KafkaStreams instance is potentially responsible for multiple
> partitions, so when you use Queryable State on a particular instance you
> are querying all partitions for that store on the given instance.
>
>
>
> >
> >
> >
> >
> >
> > On Thu, Mar 16, 2017 at 2:11 PM, Damian Guy 
> wrote:
> >
> > > I think what you are seeing is that the order is not guaranteed across
> > > partitions. When you use Queryable State you are actually querying
> > multiple
> > > underlying stores, i.e., one per partition. The implementation iterates
> > > over one store/partition at a time, so the ordering will appear random.
> > > This could be improved
> > >
> > > The tombstone records appearing in the results seems like a bug.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Thu, 16 Mar 2017 at 17:37 Matthias J. Sax 
> > > wrote:
> > >
> > > > Can you check if the problem exist for 0.10.2, too? (0.10.2 is
> > > > compatible to 0.10.1 broker -- so you can upgrade your Streams code
> > > > independently from the brokers).
> > > >
> > > > About the range: I did double check this, and I guess my last answer
> > was
> > > > not correct, and range() should return ordered data, but I got a
> follow
> > > > up question: what the key type and serializer you use? Internally,
> data
> > > > is stored in serialized form and ordered according to
> > > > `LexicographicByteArrayComparator` -- thus, if the serialized bytes
> > > > don't reflect the order of the deserialized data, it returned range
> > > > shows up unordered to you.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > >
> > > > On 3/16/17 10:14 AM, Dmitry Minkovsky wrote:
> > > > > Hi Matthias. Thank you for your response.
> > > > >
> > > > > Yes, I was able to reproduce the null issue reliably. I can't open
> a
> > > JIRA
> > > > > at this time, but I can say I was using 0.10.1.0 and it was trivial
> > to
> > > > > reproduce. Just send records and the tombstones to a table topic.
> > Then
> > > > scan
> > > > > the range. You'll see the tombstones.
> > > > >
> > > > > Indeed, ranges are returned with no specific order. I'm not sure
> what
> > > you
> > > > > mean that default stores are hash-based, but this ordering thing
> is a
> > > > shame
> > > > > because it kind of kills the ability to use KS as a full fledged DB
> > > that
> > > > > lets you index things like HBase (composite keys for lists of
> items).
> > > Is
> > > > > that how RocksDB works? Just returns range scans in random order? I
> > > don't
> > > > > know C++ so the documentation is a bit opaque to me. But what's the
> > > point
> > > > > of scanning a range if the data comes in some random order? That
> > being
> > > > the
> > > > > case, the number of possible use-case scenarios seem to become
> > > > > significantly limited.
> > > > >
> > > > >
> > > > > Thank you!
> > > > > Dmitry
> > > > >
> > > > > On Tue, Mar 14, 2017 at 1:12 PM, Matthias J. Sax <
> > > matth...@confluent.io>
> > > > > wrote:
> > > > >
> > > > >>> However,
> > > >  for keys that have been tombstoned, it does return null for me.
> > > > >>
> > > > >> Sound like a bug. Can you reliable reproduce this? Would you mind
> > > > >> opening a JIRA?
> > > > >>
> > > > >> Can you check if this happens for both cases: caching enabled and
> > > > >> disabled? Or only for once case?
> > > > >>
> > > > >>
> > > > >>> "No ordering guarantees are provided."
> > > > >>
> > > > >> That is correct. Internally, default stores are hash-based --
> thus,
> > we
> > > > >> don't give a sorted list/iterator back. You could replace RocksDB
> > > with a
> > > > >> custom store though.
> > > > >>
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >>
> > > > >> On 3/13/17 3:56 PM, Dmitry Minkovsky wrote:
> > > > >>> I am using interactive streams to query tables:
> > > > >>>
> > > > >>> ReadOnlyKeyValueStore > > > >>> Messages.UserLetter> store
> > > > >>>   = streams.store("view-user-drafts",
> > > > >>> QueryableStoreTypes.keyVal

Re: Kafka Streams: ReadOnlyKeyValueStore range behavior

2017-03-17 Thread Damian Guy
> When you use Queryable State you are actually querying multiple

> underlying stores, i.e., one per partition.
>
> Huh? I was only querying one partition. In my example, I have a user's
> posts. Upon creation, they are routed to a particular partition using a
> partitioner that hashes the post's user ID. The posts are then indexed on
> that partition by prefixed keys using the method described above. When
> querying, I am only querying the one partition that has all of the user's
> posts. As far as I know, I am not querying across multiple partitions.
> Furthermore, I did not even think this was possible, given the fact that
> Interactive Queries require you to manually forward requests that should go
> to other partitions.
>
>
Each KafkaStreams instance is potentially responsible for multiple
partitions, so when you use Queryable State on a particular instance you
are querying all partitions for that store on the given instance.



>
>
>
>
>
> On Thu, Mar 16, 2017 at 2:11 PM, Damian Guy  wrote:
>
> > I think what you are seeing is that the order is not guaranteed across
> > partitions. When you use Queryable State you are actually querying
> multiple
> > underlying stores, i.e., one per partition. The implementation iterates
> > over one store/partition at a time, so the ordering will appear random.
> > This could be improved
> >
> > The tombstone records appearing in the results seems like a bug.
> >
> > Thanks,
> > Damian
> >
> > On Thu, 16 Mar 2017 at 17:37 Matthias J. Sax 
> > wrote:
> >
> > > Can you check if the problem exist for 0.10.2, too? (0.10.2 is
> > > compatible to 0.10.1 broker -- so you can upgrade your Streams code
> > > independently from the brokers).
> > >
> > > About the range: I did double check this, and I guess my last answer
> was
> > > not correct, and range() should return ordered data, but I got a follow
> > > up question: what the key type and serializer you use? Internally, data
> > > is stored in serialized form and ordered according to
> > > `LexicographicByteArrayComparator` -- thus, if the serialized bytes
> > > don't reflect the order of the deserialized data, it returned range
> > > shows up unordered to you.
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > >
> > > On 3/16/17 10:14 AM, Dmitry Minkovsky wrote:
> > > > Hi Matthias. Thank you for your response.
> > > >
> > > > Yes, I was able to reproduce the null issue reliably. I can't open a
> > JIRA
> > > > at this time, but I can say I was using 0.10.1.0 and it was trivial
> to
> > > > reproduce. Just send records and the tombstones to a table topic.
> Then
> > > scan
> > > > the range. You'll see the tombstones.
> > > >
> > > > Indeed, ranges are returned with no specific order. I'm not sure what
> > you
> > > > mean that default stores are hash-based, but this ordering thing is a
> > > shame
> > > > because it kind of kills the ability to use KS as a full fledged DB
> > that
> > > > lets you index things like HBase (composite keys for lists of items).
> > Is
> > > > that how RocksDB works? Just returns range scans in random order? I
> > don't
> > > > know C++ so the documentation is a bit opaque to me. But what's the
> > point
> > > > of scanning a range if the data comes in some random order? That
> being
> > > the
> > > > case, the number of possible use-case scenarios seem to become
> > > > significantly limited.
> > > >
> > > >
> > > > Thank you!
> > > > Dmitry
> > > >
> > > > On Tue, Mar 14, 2017 at 1:12 PM, Matthias J. Sax <
> > matth...@confluent.io>
> > > > wrote:
> > > >
> > > >>> However,
> > >  for keys that have been tombstoned, it does return null for me.
> > > >>
> > > >> Sound like a bug. Can you reliable reproduce this? Would you mind
> > > >> opening a JIRA?
> > > >>
> > > >> Can you check if this happens for both cases: caching enabled and
> > > >> disabled? Or only for once case?
> > > >>
> > > >>
> > > >>> "No ordering guarantees are provided."
> > > >>
> > > >> That is correct. Internally, default stores are hash-based -- thus,
> we
> > > >> don't give a sorted list/iterator back. You could replace RocksDB
> > with a
> > > >> custom store though.
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >> On 3/13/17 3:56 PM, Dmitry Minkovsky wrote:
> > > >>> I am using interactive streams to query tables:
> > > >>>
> > > >>> ReadOnlyKeyValueStore > > >>> Messages.UserLetter> store
> > > >>>   = streams.store("view-user-drafts",
> > > >>> QueryableStoreTypes.keyValueStore());
> > > >>>
> > > >>> Documentation says that #range() should not return null values.
> > > However,
> > > >>> for keys that have been tombstoned, it does return null for me.
> > > >>>
> > > >>> Also, I noticed only just now that "No ordering guarantees are
> > > >> provided." I
> > > >>> haven't done enough testing or looked at the code carefully enough
> > yet
> > > >> and
> > > >>> wonder if someone who knows could confirm: is this true? Is this
> > common
> > > >> to
> > > >>> all store i

Re: Kafka Streams: ReadOnlyKeyValueStore range behavior

2017-03-17 Thread Dmitry Minkovsky
Matthias, Damian:

Thank you for your replies.

> Can you check if the problem exist for 0.10.2, too?

I will upgrade to 0.10.2 after this development cycle. I'm still in
development so compatibility is not as big an issue as getting to
production.

>  range() should return ordered data,

In my experiments, the order in which the data was returned the first time
is the order it was returned all subsequent times. But that order was not
lexicographic, but seemingly random.

> what the key type and serializer you use?

I am using Protocol Buffers, which are ordered structs. You construct the
protocol buffers object, and then the serializer calls ".toByteArray()" on
it to get the bytes. I thought this would a very simple way to create keys
that when serialized would facilitate prefixed range scans. For example, a
Protocol Buffer message like

message {
   bytes user_id = 1;
   bytes post_id = 2;
}

when serialized puts the user_id first, then the post_id in the total byte
string. Some Protocol Buffers data types use variable-length encoding, so I
was careful not to use any of these types in my keys.

> When you use Queryable State you are actually querying multiple
underlying stores, i.e., one per partition.

Huh? I was only querying one partition. In my example, I have a user's
posts. Upon creation, they are routed to a particular partition using a
partitioner that hashes the post's user ID. The posts are then indexed on
that partition by prefixed keys using the method described above. When
querying, I am only querying the one partition that has all of the user's
posts. As far as I know, I am not querying across multiple partitions.
Furthermore, I did not even think this was possible, given the fact that
Interactive Queries require you to manually forward requests that should go
to other partitions.






On Thu, Mar 16, 2017 at 2:11 PM, Damian Guy  wrote:

> I think what you are seeing is that the order is not guaranteed across
> partitions. When you use Queryable State you are actually querying multiple
> underlying stores, i.e., one per partition. The implementation iterates
> over one store/partition at a time, so the ordering will appear random.
> This could be improved
>
> The tombstone records appearing in the results seems like a bug.
>
> Thanks,
> Damian
>
> On Thu, 16 Mar 2017 at 17:37 Matthias J. Sax 
> wrote:
>
> > Can you check if the problem exist for 0.10.2, too? (0.10.2 is
> > compatible to 0.10.1 broker -- so you can upgrade your Streams code
> > independently from the brokers).
> >
> > About the range: I did double check this, and I guess my last answer was
> > not correct, and range() should return ordered data, but I got a follow
> > up question: what the key type and serializer you use? Internally, data
> > is stored in serialized form and ordered according to
> > `LexicographicByteArrayComparator` -- thus, if the serialized bytes
> > don't reflect the order of the deserialized data, it returned range
> > shows up unordered to you.
> >
> >
> > -Matthias
> >
> >
> >
> >
> > On 3/16/17 10:14 AM, Dmitry Minkovsky wrote:
> > > Hi Matthias. Thank you for your response.
> > >
> > > Yes, I was able to reproduce the null issue reliably. I can't open a
> JIRA
> > > at this time, but I can say I was using 0.10.1.0 and it was trivial to
> > > reproduce. Just send records and the tombstones to a table topic. Then
> > scan
> > > the range. You'll see the tombstones.
> > >
> > > Indeed, ranges are returned with no specific order. I'm not sure what
> you
> > > mean that default stores are hash-based, but this ordering thing is a
> > shame
> > > because it kind of kills the ability to use KS as a full fledged DB
> that
> > > lets you index things like HBase (composite keys for lists of items).
> Is
> > > that how RocksDB works? Just returns range scans in random order? I
> don't
> > > know C++ so the documentation is a bit opaque to me. But what's the
> point
> > > of scanning a range if the data comes in some random order? That being
> > the
> > > case, the number of possible use-case scenarios seem to become
> > > significantly limited.
> > >
> > >
> > > Thank you!
> > > Dmitry
> > >
> > > On Tue, Mar 14, 2017 at 1:12 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > >>> However,
> >  for keys that have been tombstoned, it does return null for me.
> > >>
> > >> Sound like a bug. Can you reliable reproduce this? Would you mind
> > >> opening a JIRA?
> > >>
> > >> Can you check if this happens for both cases: caching enabled and
> > >> disabled? Or only for once case?
> > >>
> > >>
> > >>> "No ordering guarantees are provided."
> > >>
> > >> That is correct. Internally, default stores are hash-based -- thus, we
> > >> don't give a sorted list/iterator back. You could replace RocksDB
> with a
> > >> custom store though.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 3/13/17 3:56 PM, Dmitry Minkovsky wrote:
> > >>> I am using interactive streams to query tables:
> > >>>
> > 

Re: Kafka Streams: lockException

2017-03-17 Thread Eno Thereska
Sachin, you also have a PR for this that could help, right?: 
https://github.com/apache/kafka/pull/2642#issuecomment-287372367 
. 

Thanks
Eno


> On 17 Mar 2017, at 15:19, Sachin Mittal  wrote:
> 
> We also face same issues.
> What we have found is that rocksdb is the issue. With many instances of
> rocksdb per machine, over the time it slows down due to i/o operations,
> resulting in threads getting evicted because max.poll.interval exceeds the
> set limit.
> 
> Try running rocksdb in memory https://github.com/facebook/
> rocksdb/wiki/How-to-persist-in-memory-RocksDB-database%3F.
> 
> Thanks
> Sachin
> 
> 
> 
> On Fri, Mar 17, 2017 at 8:34 PM, Tianji Li  wrote:
> 
>> Hi Eno,
>> 
>> I used 150, 50, 20 threads and the probabilities of crashing decreased with
>> this number. When using 1 thread, no crash!
>> 
>> My max.poll.interval is 5 minutes and all the processing won't last that
>> long, so that parameter does not help.
>> 
>> 
>> Thanks
>> Tianji
>> 
>> On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska 
>> wrote:
>> 
>>> Hi Tianji,
>>> 
>>> How many threads does your app use?
>>> 
>>> One reason is explained here: https://groups.google.com/
>>> forum/#!topic/confluent-platform/wgCSuwIJo5g >> forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to
>> increase
>>> max.poll.interval config value.
>>> If that doesn't work, could you revert to using one thread for now. Also
>>> let us know either way since we might need to open a bug report.
>>> 
>>> Thanks
>>> Eno
>>> 
 On 16 Mar 2017, at 20:47, Tianji Li  wrote:
 
 Hi there,
 
 I always got this crashes and wonder if anyone knows why. Please let me
 know what information I should provide to help with trouble shooting.
 
 I am using 0.10.2.0. My application is reading one topic and then
 groupBy().aggregate() 50 times on different keys.
 
 I use memory store, without backing to kafka.
 
 Thanks
 Tianji
 
 
 2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
 o.a.k.s.p.internals.StreamThread : Could not create task 0_4.
>>> Will
 retry.
 
 org.apache.kafka.streams.errors.LockException: task [0_4] Failed to
>> lock
 the state directory: /tmp/kafka-streams/xxx-test28/0_4
   at
 org.apache.kafka.streams.processor.internals.
>>> ProcessorStateManager.(ProcessorStateManager.java:102)
   at
 org.apache.kafka.streams.processor.internals.AbstractTask.(
>>> AbstractTask.java:73)
   at
 org.apache.kafka.streams.processor.internals.
>>> StreamTask.(StreamTask.java:108)
   at
 org.apache.kafka.streams.processor.internals.
>>> StreamThread.createStreamTask(StreamThread.java:834)
   at
 org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
>>> createTask(StreamThread.java:1207)
   at
 org.apache.kafka.streams.processor.internals.StreamThread$
>>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
   at
 org.apache.kafka.streams.processor.internals.
>>> StreamThread.addStreamTasks(StreamThread.java:937)
   at
 org.apache.kafka.streams.processor.internals.StreamThread.access$500(
>>> StreamThread.java:69)
   at
 org.apache.kafka.streams.processor.internals.StreamThread$1.
>>> onPartitionsAssigned(StreamThread.java:236)
   at
 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
>>> onJoinComplete(ConsumerCoordinator.java:255)
   at
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>>> joinGroupIfNeeded(AbstractCoordinator.java:339)
   at
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>>> ensureActiveGroup(AbstractCoordinator.java:303)
   at
 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
>>> ConsumerCoordinator.java:286)
   at
 org.apache.kafka.clients.consumer.KafkaConsumer.
>>> pollOnce(KafkaConsumer.java:1030)
   at
 org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>>> KafkaConsumer.java:995)
   at
 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>> StreamThread.java:582)
   at
 org.apache.kafka.streams.processor.internals.
>>> StreamThread.run(StreamThread.java:368)
>>> 
>>> 
>> 



Re: Increasing partition count and preserving local order for a key

2017-03-17 Thread Ian Wrigley
Hi

You can’t move existing records between partitions, but one possibility would 
be to create a new topic with the required number of partitions, then copy the 
data from the original topic to the new one. The default partitioning algorithm 
would ensure that all records with the same key in the new topic would be in 
the same partition.

Ian.


> On Mar 17, 2017, at 1:38 AM, Auel, Tarek  wrote:
> 
> Hi,
> 
> I would like to increase the number of a partition in a topic while 
> preserving the local order within a key. I do know that if I have a simple 
> hashing algorithm, say h(x) = x mod partitionCnt, the order is not preserved 
> if I increase the number of partitions. My goal is that a consumer is able to 
> consume messages for a specific key in-order.
> 
> I think something like this is not implemented in Kafka, but maybe there are 
> already best practices patterns or some kind of library that implements this. 
> I'd highly appreciate if you have some pointers for me.
> 
> Cheers
> Tarek



Re: Kafka Streams: lockException

2017-03-17 Thread Sachin Mittal
We also face same issues.
What we have found is that rocksdb is the issue. With many instances of
rocksdb per machine, over the time it slows down due to i/o operations,
resulting in threads getting evicted because max.poll.interval exceeds the
set limit.

Try running rocksdb in memory https://github.com/facebook/
rocksdb/wiki/How-to-persist-in-memory-RocksDB-database%3F.

Thanks
Sachin



On Fri, Mar 17, 2017 at 8:34 PM, Tianji Li  wrote:

> Hi Eno,
>
> I used 150, 50, 20 threads and the probabilities of crashing decreased with
> this number. When using 1 thread, no crash!
>
> My max.poll.interval is 5 minutes and all the processing won't last that
> long, so that parameter does not help.
>
>
> Thanks
> Tianji
>
> On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska 
> wrote:
>
> > Hi Tianji,
> >
> > How many threads does your app use?
> >
> > One reason is explained here: https://groups.google.com/
> > forum/#!topic/confluent-platform/wgCSuwIJo5g  > forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to
> increase
> > max.poll.interval config value.
> > If that doesn't work, could you revert to using one thread for now. Also
> > let us know either way since we might need to open a bug report.
> >
> > Thanks
> > Eno
> >
> > > On 16 Mar 2017, at 20:47, Tianji Li  wrote:
> > >
> > > Hi there,
> > >
> > > I always got this crashes and wonder if anyone knows why. Please let me
> > > know what information I should provide to help with trouble shooting.
> > >
> > > I am using 0.10.2.0. My application is reading one topic and then
> > > groupBy().aggregate() 50 times on different keys.
> > >
> > > I use memory store, without backing to kafka.
> > >
> > > Thanks
> > > Tianji
> > >
> > >
> > > 2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
> > > o.a.k.s.p.internals.StreamThread : Could not create task 0_4.
> > Will
> > > retry.
> > >
> > > org.apache.kafka.streams.errors.LockException: task [0_4] Failed to
> lock
> > > the state directory: /tmp/kafka-streams/xxx-test28/0_4
> > >at
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.(ProcessorStateManager.java:102)
> > >at
> > > org.apache.kafka.streams.processor.internals.AbstractTask.(
> > AbstractTask.java:73)
> > >at
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.(StreamTask.java:108)
> > >at
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.createStreamTask(StreamThread.java:834)
> > >at
> > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> > createTask(StreamThread.java:1207)
> > >at
> > > org.apache.kafka.streams.processor.internals.StreamThread$
> > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> > >at
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.addStreamTasks(StreamThread.java:937)
> > >at
> > > org.apache.kafka.streams.processor.internals.StreamThread.access$500(
> > StreamThread.java:69)
> > >at
> > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > onPartitionsAssigned(StreamThread.java:236)
> > >at
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > onJoinComplete(ConsumerCoordinator.java:255)
> > >at
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > joinGroupIfNeeded(AbstractCoordinator.java:339)
> > >at
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > ensureActiveGroup(AbstractCoordinator.java:303)
> > >at
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> > ConsumerCoordinator.java:286)
> > >at
> > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > pollOnce(KafkaConsumer.java:1030)
> > >at
> > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > KafkaConsumer.java:995)
> > >at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:582)
> > >at
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:368)
> >
> >
>


Re: Offset commit request failing

2017-03-17 Thread Robert Quinlivan
Thanks for the response. Reading through that thread, it appears that this
issue was addressed with KAFKA-3810
. This change eases the
restriction on fetch size between replicas. However, should the outcome be
a more comprehensive change to the serialization format of the request? The
size of the group metadata currently grows linearly with the number of
topic-partitions. This is difficult to tune for in a configuration using
topic auto creation.



On Fri, Mar 17, 2017 at 3:17 AM, James Cheng  wrote:

> I think it's due to the high number of partitions and the high number of
> consumers in the group. The group coordination info to keep track of the
> assignments actually happens via a message that travels through the
> __consumer_offsets topic. So with so many partitions and consumers, the
> message gets too big to go through the topic.
>
> There is a long thread here that discusses it. I don't remember what
> specific actions came out of that discussion. http://search-hadoop.com/m/
> Kafka/uyzND1yd26N1rFtRd1?subj=+DISCUSS+scalability+limits+
> in+the+coordinator
>
> -James
>
> Sent from my iPhone
>
> > On Mar 15, 2017, at 9:40 AM, Robert Quinlivan 
> wrote:
> >
> > I should also mention that this error was seen on broker version
> 0.10.1.1.
> > I found that this condition sounds somewhat similar to KAFKA-4362
> > , but that issue was
> > submitted in 0.10.1.1 so they appear to be different issues.
> >
> > On Wed, Mar 15, 2017 at 11:11 AM, Robert Quinlivan  >
> > wrote:
> >
> >> Good morning,
> >>
> >> I'm hoping for some help understanding the expected behavior for an
> offset
> >> commit request and why this request might fail on the broker.
> >>
> >> *Context:*
> >>
> >> For context, my configuration looks like this:
> >>
> >>   - Three brokers
> >>   - Consumer offsets topic replication factor set to 3
> >>   - Auto commit enabled
> >>   - The user application topic, which I will call "my_topic", has a
> >>   replication factor of 3 as well and 800 partitions
> >>   - 4000 consumers attached in consumer group "my_group"
> >>
> >>
> >> *Issue:*
> >>
> >> When I attach the consumers, the coordinator logs the following error
> >> message repeatedly for each generation:
> >>
> >> ERROR [Group Metadata Manager on Broker 0]: Appending metadata message
> for
> >> group my_group generation 2066 failed due to org.apache.kafka.common.
> >> errors.RecordTooLargeException, returning UNKNOWN error code to the
> >> client (kafka.coordinator.GroupMetadataManager)
> >>
> >> *Observed behavior:*
> >>
> >> The consumer group does not stay connected long enough to consume
> >> messages. It is effectively stuck in a rebalance loop and the "my_topic"
> >> data has become unavailable.
> >>
> >>
> >> *Investigation:*
> >>
> >> Following the Group Metadata Manager code, it looks like the broker is
> >> writing to a cache after it writes an Offset Commit Request to the log
> >> file. If this cache write fails, the broker then logs this error and
> >> returns an error code in the response. In this case, the error from the
> >> cache is MESSAGE_TOO_LARGE, which is logged as a
> RecordTooLargeException.
> >> However, the broker then sets the error code to UNKNOWN on the Offset
> >> Commit Response.
> >>
> >> It seems that the issue is the size of the metadata in the Offset Commit
> >> Request. I have the following questions:
> >>
> >>   1. What is the size limit for this request? Are we exceeding the size
> >>   which is causing this request to fail?
> >>   2. If this is an issue with metadata size, what would cause abnormally
> >>   large metadata?
> >>   3. How is this cache used within the broker?
> >>
> >>
> >> Thanks in advance for any insights you can provide.
> >>
> >> Regards,
> >> Robert Quinlivan
> >> Software Engineer, Signal
> >>
> >
> >
> >
> > --
> > Robert Quinlivan
> > Software Engineer, Signal
>



-- 
Robert Quinlivan
Software Engineer, Signal


Re: Kafka Streams: lockException

2017-03-17 Thread Tianji Li
Hi Eno,

I used 150, 50, 20 threads and the probabilities of crashing decreased with
this number. When using 1 thread, no crash!

My max.poll.interval is 5 minutes and all the processing won't last that
long, so that parameter does not help.


Thanks
Tianji

On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska 
wrote:

> Hi Tianji,
>
> How many threads does your app use?
>
> One reason is explained here: https://groups.google.com/
> forum/#!topic/confluent-platform/wgCSuwIJo5g  forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to increase
> max.poll.interval config value.
> If that doesn't work, could you revert to using one thread for now. Also
> let us know either way since we might need to open a bug report.
>
> Thanks
> Eno
>
> > On 16 Mar 2017, at 20:47, Tianji Li  wrote:
> >
> > Hi there,
> >
> > I always got this crashes and wonder if anyone knows why. Please let me
> > know what information I should provide to help with trouble shooting.
> >
> > I am using 0.10.2.0. My application is reading one topic and then
> > groupBy().aggregate() 50 times on different keys.
> >
> > I use memory store, without backing to kafka.
> >
> > Thanks
> > Tianji
> >
> >
> > 2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
> > o.a.k.s.p.internals.StreamThread : Could not create task 0_4.
> Will
> > retry.
> >
> > org.apache.kafka.streams.errors.LockException: task [0_4] Failed to lock
> > the state directory: /tmp/kafka-streams/xxx-test28/0_4
> >at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.(ProcessorStateManager.java:102)
> >at
> > org.apache.kafka.streams.processor.internals.AbstractTask.(
> AbstractTask.java:73)
> >at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.(StreamTask.java:108)
> >at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:834)
> >at
> > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> createTask(StreamThread.java:1207)
> >at
> > org.apache.kafka.streams.processor.internals.StreamThread$
> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> >at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:937)
> >at
> > org.apache.kafka.streams.processor.internals.StreamThread.access$500(
> StreamThread.java:69)
> >at
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:236)
> >at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:255)
> >at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:339)
> >at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> >at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:286)
> >at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1030)
> >at
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> >at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:582)
> >at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
>
>


Increasing partition count and preserving local order for a key

2017-03-17 Thread Auel, Tarek
Hi,

I would like to increase the number of a partition in a topic while preserving 
the local order within a key. I do know that if I have a simple hashing 
algorithm, say h(x) = x mod partitionCnt, the order is not preserved if I 
increase the number of partitions. My goal is that a consumer is able to 
consume messages for a specific key in-order.

I think something like this is not implemented in Kafka, but maybe there are 
already best practices patterns or some kind of library that implements this. 
I'd highly appreciate if you have some pointers for me.

Cheers
Tarek


Real Time Streaming With Multiple Data Sources

2017-03-17 Thread 6yvu7u+1evsxnxv0
Hi,

We are planning to build a real time monitoring system with apache kafka. The 
overall idea is to push data from multiple data sources to kafka and perform 
data quality checks. I have few questions with this architecture

1. What are the best possible approaches of streaming data from multiple 
sources which mainly include java applications, oracle database, rest api's, 
log files to apache kafka? Note each client deployment includes each of such 
data sources. Hence the number of data sources pushing data to kafka would be 
equal to the number of customers * x where x are the types of data sources that 
I listed. Ideally a push approach would suit best instead of a pull approach. 
In the pull approach the target system would have to be configured with the 
credentials of various source system which would not be practical
2. How do we handle failures?
3. How do we perform data quality checks on the incoming messages? For e.g. If 
a certain message does not have all the required attributes, the message could 
be discarded and an alert could be raised for the maintenance team to check.

Kindly let me know your expert inputs. Thanks !






Sent using Guerrillamail.com
Block or report abuse: 
https://www.guerrillamail.com//abuse/?a=VFJxFx4gSLUTgw%2F68W4ecRzCA8WC1Q%3D%3D




Re: Offset commit request failing

2017-03-17 Thread James Cheng
I think it's due to the high number of partitions and the high number of 
consumers in the group. The group coordination info to keep track of the 
assignments actually happens via a message that travels through the 
__consumer_offsets topic. So with so many partitions and consumers, the message 
gets too big to go through the topic.

There is a long thread here that discusses it. I don't remember what specific 
actions came out of that discussion. 
http://search-hadoop.com/m/Kafka/uyzND1yd26N1rFtRd1?subj=+DISCUSS+scalability+limits+in+the+coordinator

-James

Sent from my iPhone

> On Mar 15, 2017, at 9:40 AM, Robert Quinlivan  wrote:
> 
> I should also mention that this error was seen on broker version 0.10.1.1.
> I found that this condition sounds somewhat similar to KAFKA-4362
> , but that issue was
> submitted in 0.10.1.1 so they appear to be different issues.
> 
> On Wed, Mar 15, 2017 at 11:11 AM, Robert Quinlivan 
> wrote:
> 
>> Good morning,
>> 
>> I'm hoping for some help understanding the expected behavior for an offset
>> commit request and why this request might fail on the broker.
>> 
>> *Context:*
>> 
>> For context, my configuration looks like this:
>> 
>>   - Three brokers
>>   - Consumer offsets topic replication factor set to 3
>>   - Auto commit enabled
>>   - The user application topic, which I will call "my_topic", has a
>>   replication factor of 3 as well and 800 partitions
>>   - 4000 consumers attached in consumer group "my_group"
>> 
>> 
>> *Issue:*
>> 
>> When I attach the consumers, the coordinator logs the following error
>> message repeatedly for each generation:
>> 
>> ERROR [Group Metadata Manager on Broker 0]: Appending metadata message for
>> group my_group generation 2066 failed due to org.apache.kafka.common.
>> errors.RecordTooLargeException, returning UNKNOWN error code to the
>> client (kafka.coordinator.GroupMetadataManager)
>> 
>> *Observed behavior:*
>> 
>> The consumer group does not stay connected long enough to consume
>> messages. It is effectively stuck in a rebalance loop and the "my_topic"
>> data has become unavailable.
>> 
>> 
>> *Investigation:*
>> 
>> Following the Group Metadata Manager code, it looks like the broker is
>> writing to a cache after it writes an Offset Commit Request to the log
>> file. If this cache write fails, the broker then logs this error and
>> returns an error code in the response. In this case, the error from the
>> cache is MESSAGE_TOO_LARGE, which is logged as a RecordTooLargeException.
>> However, the broker then sets the error code to UNKNOWN on the Offset
>> Commit Response.
>> 
>> It seems that the issue is the size of the metadata in the Offset Commit
>> Request. I have the following questions:
>> 
>>   1. What is the size limit for this request? Are we exceeding the size
>>   which is causing this request to fail?
>>   2. If this is an issue with metadata size, what would cause abnormally
>>   large metadata?
>>   3. How is this cache used within the broker?
>> 
>> 
>> Thanks in advance for any insights you can provide.
>> 
>> Regards,
>> Robert Quinlivan
>> Software Engineer, Signal
>> 
> 
> 
> 
> -- 
> Robert Quinlivan
> Software Engineer, Signal