Re: Consumer Offsets and Open FDs

2016-07-13 Thread Tom Crayford
Hi,

You're running into the issue in
https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-3894 and
possibly
https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-3587
(which is fixed in 0.10). Sadly right now there's no way to know how high a
dedupe buffer size you need - it depends on the write throughput and number
of unique keys going to that topic. For now I'd recommend:

a) Upgrade to 0.10 as KAFKA-3587 is fixed there. Kafka doesn't backport
patches (as far as I'm aware), so you need to upgrade.
b) monitor and alert on the log cleaner thread dying. This can be done by
getting a thread dump from jmx, loading the thread names and ensuring one
with "log-cleaner" is always running. Alternatively monitoring the number
of log segments for compacted topics, or the number of file descriptors
will serve as an ok proxy. When this thread does crash, you have to
remediate by increasing the dedupe buffer size

We're exploring solutions in KAFKA-3894, and would love your feedback there
if you have any thoughts.

Thanks

Tom Crayford
Heroku Kafka

On Wednesday, 13 July 2016, Rakesh Vidyadharan 
wrote:

> We ran into this as well, and I ended up with the following that works for
> us.
>
> log.cleaner.dedupe.buffer.size=536870912
> log.cleaner.io.buffer.size=2000
>
>
>
>
>
> On 13/07/2016 14:01, "Lawrence Weikum" >
> wrote:
>
> >Apologies. Here is the full trace from a broker:
> >
> >[2016-06-24 09:57:39,881] ERROR [kafka-log-cleaner-thread-0], Error due
> to  (kafka.log.LogCleaner)
> >java.lang.IllegalArgumentException: requirement failed: 9730197928
> messages in segment __consumer_offsets-36/.log but
> offset map can fit only 5033164. You can increase
> log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads
> >at scala.Predef$.require(Predef.scala:219)
> >at
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> >at
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> >at
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> >at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
> >at kafka.log.Cleaner.clean(LogCleaner.scala:322)
> >at
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
> >at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
> >at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> >[2016-06-24 09:57:39,881] INFO [kafka-log-cleaner-thread-0], Stopped
> (kafka.log.LogCleaner)
> >
> >
> >Is log.cleaner.dedupe.buffer.size a broker setting?  What is a good
> number to set it to?
> >
> >
> >
> >Lawrence Weikum
> >
> >
> >On 7/13/16, 11:18 AM, "Manikumar Reddy"  > wrote:
> >
> >Can you post the complete error stack trace?   Yes, you need to
> >restart the affected
> >brokers.
> >You can tweak log.cleaner.dedupe.buffer.size, log.cleaner.io.buffer.size
> >configs.
> >
> >Some related JIRAs:
> >
> >https://issues.apache.org/jira/browse/KAFKA-3587
> >https://issues.apache.org/jira/browse/KAFKA-3894
> >https://issues.apache.org/jira/browse/KAFKA-3915
> >
> >On Wed, Jul 13, 2016 at 10:36 PM, Lawrence Weikum  >
> >wrote:
> >
> >> Oh interesting. I didn’t know about that log file until now.
> >>
> >> The only error that has been populated among all brokers showing this
> >> behavior is:
> >>
> >> ERROR [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
> >>
> >> Then we see many messages like this:
> >>
> >> INFO Compaction for partition [__consumer_offsets,30] is resumed
> >> (kafka.log.LogCleaner)
> >> INFO The cleaning for partition [__consumer_offsets,30] is aborted
> >> (kafka.log.LogCleaner)
> >>
> >> Using Visual VM, I do not see any log-cleaner threads in those
> brokers.  I
> >> do see it in the brokers not showing this behavior though.
> >>
> >> Any idea why the LogCleaner failed?
> >>
> >> As a temporary fix, should we restart the affected brokers?
> >>
> >> Thanks again!
> >>
> >>
> >> Lawrence Weikum
> >>
> >> On 7/13/16, 10:34 AM, "Manikumar Reddy"  > wrote:
> >>
> >> Hi,
> >>
> >> Are you seeing any errors in log-cleaner.log?  The log-cleaner thread
> can
> >> crash on certain errors.
> >>
> >> Thanks
> >> Manikumar
> >>
> >> On Wed, Jul 13, 2016 at 9:54 PM, Lawrence Weikum  >
> >> wrote:
> >>
> >> > Hello,
> >> >
> >> > We’re seeing a strange behavior in Kafka 0.9.0.1 which occurs about
> every
> >> > other week.  I’m curious if others have seen it and know of a
> solution.
> >> >
> >> > Setup and Scenario:
> >> >
> >> > -  Brokers initially setup with log compaction turned off
> >> >
> >> > -  After 30 days, log compaction was turned on
> >> >
> >> > -  At this time, the number of Open FDs was ~ 30K per broker.
> >> >
> >> > -  After 2 days, the __consumer_offsets topic was compacted
> >> > fully.  Open FDs reduced to ~5K per broker.
> >> >
> >> > -  Cluster has been under normal load for rou

Re: Streams Compatibility

2016-07-13 Thread Sharninder
requires 0.10

On Thu, Jul 14, 2016 at 6:08 AM, Matt Anderson 
wrote:

> Is the new Kafka Streams API compatible with Kafka 0.9.x API and Broker or
> does it require v0.10.x?
>
> Thanks,
> Matt
>



-- 
--
Sharninder


Re: KStream-to-KStream Join Example

2016-07-13 Thread vivek thakre
Yes, there are same number of partitions to both the topic, also same
partition key i.e userId
If I just join the streams without applying the map functions (in this
case userClickStream
and userEventSrtream) , it works.

Thanks,
Vivek


On Wed, Jul 13, 2016 at 4:53 PM, Philippe Derome  wrote:

> Did you specify same number of partitions for the two input topics you are
> joining? I think that this is usually the first thing people ask to verify
> with errors similar to yours.
>
> If you are experimenting with learning some concepts, it is simpler to
> always use one partition for your topics.
> On 13 Jul 2016 7:40 p.m., "vivek thakre"  wrote:
>
> > Hello,
> >
> > I want to join 2 Topics (KStreams)
> >
> >
> > Stream 1
> > Topic :  userIdClicks
> > Key : userId
> > Value : JSON String with event details
> >
> > Stream 2
> > Topic :  userIdChannel
> > Key : userId
> > Value : JSON String  with event details and has channel value
> >
> > I could not find any examples with KStream-to-KStream Join.
> >
> > Here is my code
> >
> > //build stream userIdClicks
> > > KStream userClickStream = builder.stream(stringSerde,
> > stringSerde,
> > > "userClicks");
> > >
> >
> >
> > > //create stream -> < userId, 1 (count) >
> > > KStream *userClickCountStream* = userClickStream.filter((
> > > userId,record)-> userId != null) .map((userId,record) -> new
> KeyValue<>(
> > > userId,1l));
> > >
> >
> >
> > > //build stream userChannelStream
> > > KStream userEventStream = builder.stream(stringSerde,
> > > stringSerde, "userEvents");
> > >
> >
> >
> > > //create stream  : extract channel value from json
> > string
> > > KStream *userChannelStream* =  userEventStream
> > > .filter((userId,record)-> userId != null)
> > > .map((userId,record) -> new KeyValue<>(userId
> > > ,JsonPath.read(record, "$.event.page.channel").toString()));
> > >
> >
> >
> > > //join *userClickCountStream* with
> > > *userChannelStream*KTable clicksPerChannel =
> > > userClickCountStream
> > > .join(userChannelStream, new ValueJoiner > > ChannelWithClicks>() {
> > >  @Override
> > >  public ChannelWithClicks apply(Long clicks, String
> channel)
> > {
> > > return new ChannelWithClicks(channel == null ?
> "UNKNOWN"
> > > : channel, clicks);
> > >  }
> > >  },
> > JoinWindows.of("ClicksPerChannelwindowed").after(3).before(3))
> > > //30 secs before and after
> > > .map((user, channelWithClicks) -> new
> > KeyValue<>(channelWithClicks
> > > .getChannel(), channelWithClicks.getClicks()))
> > > .reduceByKey(
> > > (firstClicks, secondClicks) -> firstClicks +
> > > secondClicks,
> > >  stringSerde, longSerde,
> "ClicksPerChannelUnwindowed"
> > > );
> >
> > When I run this topology, I get an exception
> >
> > Invalid topology building: KSTREAM-MAP-03 and
> > KSTREAM-MAP-06 are not joinable
> >
> > I looking for a way to join 2 KStreams.
> >
> > Thanks,
> >
> > Vivek
> >
>


KafkaConsumer poll(timeout) doesn't seem to work as expected

2016-07-13 Thread Josh Goodrich
The poll(timeout) method of the Java KafkaConsumer API doesn’t behave the
way you would think.  If you create a new Consumer with a groupId that has
been seen before, even if there are new events in the topic if you issue a
poll(0) it never returns any records.  I find I have to put in a loop of 2
poll(500) calls where the first call pretty much always returns nothing,
but the second will return some records (though not always all available).
Why does this happen and what’s the solution?



Josh


Streams Compatibility

2016-07-13 Thread Matt Anderson
Is the new Kafka Streams API compatible with Kafka 0.9.x API and Broker or does 
it require v0.10.x?

Thanks,
Matt


Re: KStream-to-KStream Join Example

2016-07-13 Thread Philippe Derome
Did you specify same number of partitions for the two input topics you are
joining? I think that this is usually the first thing people ask to verify
with errors similar to yours.

If you are experimenting with learning some concepts, it is simpler to
always use one partition for your topics.
On 13 Jul 2016 7:40 p.m., "vivek thakre"  wrote:

> Hello,
>
> I want to join 2 Topics (KStreams)
>
>
> Stream 1
> Topic :  userIdClicks
> Key : userId
> Value : JSON String with event details
>
> Stream 2
> Topic :  userIdChannel
> Key : userId
> Value : JSON String  with event details and has channel value
>
> I could not find any examples with KStream-to-KStream Join.
>
> Here is my code
>
> //build stream userIdClicks
> > KStream userClickStream = builder.stream(stringSerde,
> stringSerde,
> > "userClicks");
> >
>
>
> > //create stream -> < userId, 1 (count) >
> > KStream *userClickCountStream* = userClickStream.filter((
> > userId,record)-> userId != null) .map((userId,record) -> new KeyValue<>(
> > userId,1l));
> >
>
>
> > //build stream userChannelStream
> > KStream userEventStream = builder.stream(stringSerde,
> > stringSerde, "userEvents");
> >
>
>
> > //create stream  : extract channel value from json
> string
> > KStream *userChannelStream* =  userEventStream
> > .filter((userId,record)-> userId != null)
> > .map((userId,record) -> new KeyValue<>(userId
> > ,JsonPath.read(record, "$.event.page.channel").toString()));
> >
>
>
> > //join *userClickCountStream* with
> > *userChannelStream*KTable clicksPerChannel =
> > userClickCountStream
> > .join(userChannelStream, new ValueJoiner > ChannelWithClicks>() {
> >  @Override
> >  public ChannelWithClicks apply(Long clicks, String channel)
> {
> > return new ChannelWithClicks(channel == null ? "UNKNOWN"
> > : channel, clicks);
> >  }
> >  },
> JoinWindows.of("ClicksPerChannelwindowed").after(3).before(3))
> > //30 secs before and after
> > .map((user, channelWithClicks) -> new
> KeyValue<>(channelWithClicks
> > .getChannel(), channelWithClicks.getClicks()))
> > .reduceByKey(
> > (firstClicks, secondClicks) -> firstClicks +
> > secondClicks,
> >  stringSerde, longSerde, "ClicksPerChannelUnwindowed"
> > );
>
> When I run this topology, I get an exception
>
> Invalid topology building: KSTREAM-MAP-03 and
> KSTREAM-MAP-06 are not joinable
>
> I looking for a way to join 2 KStreams.
>
> Thanks,
>
> Vivek
>


KStream-to-KStream Join Example

2016-07-13 Thread vivek thakre
Hello,

I want to join 2 Topics (KStreams)


Stream 1
Topic :  userIdClicks
Key : userId
Value : JSON String with event details

Stream 2
Topic :  userIdChannel
Key : userId
Value : JSON String  with event details and has channel value

I could not find any examples with KStream-to-KStream Join.

Here is my code

//build stream userIdClicks
> KStream userClickStream = builder.stream(stringSerde, 
> stringSerde,
> "userClicks");
>


> //create stream -> < userId, 1 (count) >
> KStream *userClickCountStream* = userClickStream.filter((
> userId,record)-> userId != null) .map((userId,record) -> new KeyValue<>(
> userId,1l));
>


> //build stream userChannelStream
> KStream userEventStream = builder.stream(stringSerde,
> stringSerde, "userEvents");
>


> //create stream  : extract channel value from json string
> KStream *userChannelStream* =  userEventStream
> .filter((userId,record)-> userId != null)
> .map((userId,record) -> new KeyValue<>(userId
> ,JsonPath.read(record, "$.event.page.channel").toString()));
>


> //join *userClickCountStream* with
> *userChannelStream*KTable clicksPerChannel =
> userClickCountStream
> .join(userChannelStream, new ValueJoiner ChannelWithClicks>() {
>  @Override
>  public ChannelWithClicks apply(Long clicks, String channel) {
> return new ChannelWithClicks(channel == null ? "UNKNOWN"
> : channel, clicks);
>  }
>  }, 
> JoinWindows.of("ClicksPerChannelwindowed").after(3).before(3))
> //30 secs before and after
> .map((user, channelWithClicks) -> new KeyValue<>(channelWithClicks
> .getChannel(), channelWithClicks.getClicks()))
> .reduceByKey(
> (firstClicks, secondClicks) -> firstClicks +
> secondClicks,
>  stringSerde, longSerde, "ClicksPerChannelUnwindowed"
> );

When I run this topology, I get an exception

Invalid topology building: KSTREAM-MAP-03 and
KSTREAM-MAP-06 are not joinable

I looking for a way to join 2 KStreams.

Thanks,

Vivek


kafka-connect-hdfs offset out of range behaviour

2016-07-13 Thread Prabhu V
The kafka-connect-hdfs just hangs if the "offset" that it expects is no
longer present (this happens when the message get deleted because of
retention time)

The process in this case does not write any output and the messages get
ignored.

Is this by design ?

The relevant code is

TopicPartitionWriter.java

if (offset == -1) {
  offset = record.kafkaOffset();
} else if (record.kafkaOffset() != expectedOffset) {
  // Currently it's possible to see stale data with the wrong offset
after a rebalance when you
  // rewind, which we do since we manage our own offsets. See
KAFKA-2894.
  if (!sawInvalidOffset) {
log.info(
"Ignoring stale out-of-order record in {}-{}. Has offset {}
instead of expected offset {}",
record.topic(), record.kafkaPartition(), record.kafkaOffset(),
expectedOffset);
  }
  sawInvalidOffset = true;
  return;
}

In the "else if" we should not ignore the message if the
record.kafkaOffset() is greater than expectedOffset. Any thoughts ?

Thanks,
Prabhu


Re: Read all record from a Topic.

2016-07-13 Thread James Cheng
Jean-Baptiste,

I wrote a blog post recently on this exact subject.

https://logallthethings.com/2016/06/28/how-to-read-to-the-end-of-a-kafka-topic/

Let me know if you find it useful.

-James

Sent from my iPhone

> On Jul 13, 2016, at 7:16 AM, g...@netcourrier.com wrote:
> 
> Hi,
> 
> 
> I'm using a compacted Kafka Topic to save the state of my application. When 
> the application crashes/restarts I can restore its state by reading the Kafka 
> topic.
> 
> 
> 
> However I need to read it completely, especially up to most recent record, to 
> be sure to restore all data.
> 
> 
> 
> Is there a standard way to to that? I've checked the Kafka streams code and 
> found that the class ProcessorStateManager seems to be doing something 
> similar.
> 
> 
> 
> It first gets the last offset by doing:
> 
> // calculate the end offset of the partition // TODO: this is a bit hacky to 
> first seek then position to get the end offset
> 
> restoreConsumer.seekToEnd(singleton(storePartition));
> 
> long endOffset = restoreConsumer.position(storePartition); 
> 
> 
> 
> Then it polls the records until reaching the endoffset (there is also an 
> other limit but I think it is related to an other use case).
> 
> 
> 
> I guess it works, but the TODO message makes me wonder if it is a good 
> solution and if it will continue to work in future releases.
> 
> 
> 
> Thanks for your help,
> 
> 
> 
> Jean-Baptiste
> 
> 
> 
> 
> 


Re: Role of Producer

2016-07-13 Thread Snehal Nagmote
Rest Api is for you to have the standard interface , so that you can hide
the implementation details of pushing data to Kafka topic .

You don't need number of producers same as data providers . You can have
one Producer to send data to Kafka topic .

Also , if you have requirement of dividing data to multiple Kafka topics,
one for each data provider , then I think it would be over utilization of
resources and would be difficult to manage.

You may want to add data provider information in the payload so that you
can have all data in one topic . It again depends on your requirements and
SLA,

Once you have data in topic , you can use KStreams / Kafka consumer and
write data in NOSQL data store

Thanks,
Snehal

On 13 July 2016 at 13:52, Luo, Chao  wrote:

> Hi Snehal,
>
> Thanks for your input. They already have their own Java APIs to access
> data. But why do I create rest API? What is the benefits?
>
> Say, if there are 500 data providers, do I need 500 producers at my end to
> collect data? At least, the number of producers should be proportional to
> number of data providers. In addition, I also need maybe 500 or more kafka
> servers. You see, I think the system has too many producers. It is waste of
> money.
>
>
> Best,
> Chao
>
> -Original Message-
> From: Snehal Nagmote [mailto:nagmote.sne...@gmail.com]
> Sent: Wednesday, July 13, 2016 3:38 PM
> To: users@kafka.apache.org
> Subject: Re: Role of Producer
>
> Hi Chao ,
>
> To solve this problem , I can think of creating rest api . Your end point
> can have one of the parameter as data provider if you want to send it to
> different topics based on data provider  .
>
> On backend , when you get data , you can send it to Kafka Topics, using
> Kafka Producer at the end.
>
> Thanks,
> Snehal
>
>
>
> On 13 July 2016 at 13:31, Luo, Chao  wrote:
>
> > Dear Kafka guys,
> >
> > I just started to build up a Kafka system two weeks ago. Here I have a
> > question about how to design/implement the producer.
> >
> > In my system, there are many data providers. I need to collect
> > real-time data from them and store it in a NoSQL database. The problem
> > is that different data providers have their own Java APIs, and they
> > will not use Kafka-client to send data directly to my Kafka servers.
> > So I need to first collect data from them and feed it to the Kafka
> > servers. I guess I need to finish data acquisition in the Producers.
> > My question is that there are a great number of data providers so I
> > also need a lot of producers??? Or is there any more efficient ways to
> deal with it?
> >
> > Best,
> > Chao
> >
>


Re: Building API to make Kafka reactive

2016-07-13 Thread Dean Wampler
You don't have the Scala library on the app class path, which is used to
implement Akka.

Use the same version that's required for the Akka libraries you're using.

http://mvnrepository.com/artifact/org.scala-lang/scala-library

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Lightbend 
@deanwampler 
http://polyglotprogramming.com

On Wed, Jul 13, 2016 at 12:39 PM, Shekar Tippur  wrote:

> Dean,
>
> I am having trouble getting this to work.
>
> import akka.actor.ActorSystem;
> import akka.kafka.scaladsl.Producer;
> import akka.stream.javadsl.Source;
> import akka.kafka.ProducerSettings;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import org.apache.kafka.common.serialization.StringSerializer;
>
> abstract class KafkaPlProducer {
> protected static ActorSystem system = ActorSystem.create("example");
> protected static ProducerSettings producerSettings =
> ProducerSettings.create(system, new ByteArraySerializer(),
> new StringSerializer())
> .withBootstrapServers("localhost:9092");
> }
>
> class PlumberSink extends KafkaPlProducer {
> //protected final ActorSystem system = ActorSystem.create("example");
> public static void main(String args[]) {
>
> Source.range(1, 1)
> .map(n -> n.toString()).map(elem -> new
> ProducerRecord("topic1", elem))
> .to(Producer.plainSink(producerSettings));
> }
> //Source.range(1, 1).map(n -> n.toString()).map(elem -> new
> ProducerRecord("topic1",
> elem)).to(Producer.plainSink(producerSettings));
>
> }
>
>
> Here is the exception:
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> scala/collection/Iterable
> at
> com.companyname.plumber.service.rest.api.internals.KafkaPlProducer.(PlumberSink.java:25)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:122)
> Caused by: java.lang.ClassNotFoundException: scala.collection.Iterable
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 4 more
>
>
>
> On Sat, Jul 2, 2016 at 9:42 PM, Shekar Tippur  wrote:
>
> > Dean,
> >
> > Thanks a lot for the link. I am going through the documentation.
> >
> > - Shekar
> >
> > On Wed, Jun 29, 2016 at 9:50 AM, Dean Wampler 
> > wrote:
> >
> >> Here's another Reactive API: https://github.com/akka/reactive-kafka
> >>
> >> It was developed by Software Mill  and it's
> >> now
> >> being integrated with Akka .
> >>
> >> dean
> >>
> >> Dean Wampler, Ph.D.
> >> Author: Programming Scala, 2nd Edition
> >>  (O'Reilly)
> >> Lightbend 
> >> @deanwampler 
> >> http://polyglotprogramming.com
> >>
> >> On Wed, Jun 29, 2016 at 11:03 AM, Shekar Tippur 
> >> wrote:
> >>
> >> > Thanks for the suggestion Lohith. Will try that and provide a
> feedback.
> >> >
> >> > - Shekar
> >> >
> >> > On Tue, Jun 28, 2016 at 11:45 PM, Lohith Samaga M <
> >> > lohith.sam...@mphasis.com
> >> > > wrote:
> >> >
> >> > > Hi Shekar,
> >> > > Alternatively, you could make each stage of your pipeline to
> >> > write
> >> > > to a Cassandra (or other DB) and your API will read from it. With
> >> > Cassandra
> >> > > TTL, the row will be deleted after TTL is passed. No manual cleanup
> is
> >> > > required.
> >> > >
> >> > > Best regards / Mit freundlichen Grüßen / Sincères salutations
> >> > > M. Lohith Samaga
> >> > >
> >> > >
> >> > >
> >> > > -Original Message-
> >> > > From: Shekar Tippur [mailto:ctip...@gmail.com]
> >> > > Sent: Wednesday, June 29, 2016 12.10
> >> > > To: users
> >> > > Subject: Building API to make Kafka reactive
> >> > >
> >> > > I am looking at building a reactive api on top of Kafka.
> >> > > This API produces event to Kafka topic. I want to add a unique
> >> session id
> >> > > into the payload.
> >> > > The data gets transformed as it goes through different stages of a
> >> > > pipeline. I want to specify a final topic where I want the api to
> know
> >> > that
> >> > > the processing was successful.
> >> > > The API should give different status at each part of the pipeline.
> >> > > At the ingestion, the API responds with "submitted"
> >> > > During the progressio

KTable DSL join

2016-07-13 Thread Srikanth
Hello,

I'm trying the following join using KTable. There are two change log tables.
Table1
  111 -> aaa
  222 -> bbb
  333 -> aaa

Table2
  aaa -> 999
  bbb -> 888
  ccc -> 777

My result table should be
  111 -> 999
  222 -> 888
  333 -> 999

Its not a case for join() as the keys don't match. Its more a lookup table.

Option1 is to use a Table1.toStream().process(ProcessSupplier(),
"storeName")
punctuate() will use regular kafka consumer that reads updates from Table2
and updates a private map.
Process() will do a key-value lookup.
This has an advantage when Table1 is much larger than Table2.
Each instance of the processor will have to hold entire Table2.

Option2 is to re-partition Table1 using through(StreamPartitioner) and
partition using value.
This will ensure co-location. Then join with Table2. This part might be
tricky??

Your comments and suggestions are welcome!

Srikanth


RE: Role of Producer

2016-07-13 Thread Luo, Chao
Hi Snehal,

Thanks for your input. They already have their own Java APIs to access data. 
But why do I create rest API? What is the benefits? 

Say, if there are 500 data providers, do I need 500 producers at my end to 
collect data? At least, the number of producers should be proportional to 
number of data providers. In addition, I also need maybe 500 or more kafka 
servers. You see, I think the system has too many producers. It is waste of 
money.


Best,
Chao

-Original Message-
From: Snehal Nagmote [mailto:nagmote.sne...@gmail.com] 
Sent: Wednesday, July 13, 2016 3:38 PM
To: users@kafka.apache.org
Subject: Re: Role of Producer

Hi Chao ,

To solve this problem , I can think of creating rest api . Your end point can 
have one of the parameter as data provider if you want to send it to different 
topics based on data provider  .

On backend , when you get data , you can send it to Kafka Topics, using Kafka 
Producer at the end.

Thanks,
Snehal



On 13 July 2016 at 13:31, Luo, Chao  wrote:

> Dear Kafka guys,
>
> I just started to build up a Kafka system two weeks ago. Here I have a 
> question about how to design/implement the producer.
>
> In my system, there are many data providers. I need to collect 
> real-time data from them and store it in a NoSQL database. The problem 
> is that different data providers have their own Java APIs, and they 
> will not use Kafka-client to send data directly to my Kafka servers. 
> So I need to first collect data from them and feed it to the Kafka 
> servers. I guess I need to finish data acquisition in the Producers. 
> My question is that there are a great number of data providers so I 
> also need a lot of producers??? Or is there any more efficient ways to deal 
> with it?
>
> Best,
> Chao
>


Re: Role of Producer

2016-07-13 Thread Michael Freeman
MQ was just short hand for IBM MQ or Active MQ etc etc

On Wed, Jul 13, 2016 at 9:42 PM, Luo, Chao  wrote:

> Hi thanks!
>
> Yes, I agree it is the best if they can use a kafka producer client. But I
> need to discuss with them if they will accept that.
>
> Btw, what is MQ?
>
>
>
> -Original Message-
> From: Michael Freeman [mailto:mikfree...@gmail.com]
> Sent: Wednesday, July 13, 2016 3:36 PM
> To: users@kafka.apache.org
> Subject: Re: Role of Producer
>
> Could you write them a client that uses the Kafka producer?
> You could also write some restful services that send the data to kafka.
> If they use MQ you could listen to MQ and send to Kafka.
>
>
>
> On Wed, Jul 13, 2016 at 9:31 PM, Luo, Chao  wrote:
>
> > Dear Kafka guys,
> >
> > I just started to build up a Kafka system two weeks ago. Here I have a
> > question about how to design/implement the producer.
> >
> > In my system, there are many data providers. I need to collect
> > real-time data from them and store it in a NoSQL database. The problem
> > is that different data providers have their own Java APIs, and they
> > will not use Kafka-client to send data directly to my Kafka servers.
> > So I need to first collect data from them and feed it to the Kafka
> > servers. I guess I need to finish data acquisition in the Producers.
> > My question is that there are a great number of data providers so I
> > also need a lot of producers??? Or is there any more efficient ways to
> deal with it?
> >
> > Best,
> > Chao
> >
>


RE: Role of Producer

2016-07-13 Thread Luo, Chao
Hi thanks!

Yes, I agree it is the best if they can use a kafka producer client. But I need 
to discuss with them if they will accept that. 

Btw, what is MQ?



-Original Message-
From: Michael Freeman [mailto:mikfree...@gmail.com] 
Sent: Wednesday, July 13, 2016 3:36 PM
To: users@kafka.apache.org
Subject: Re: Role of Producer

Could you write them a client that uses the Kafka producer?
You could also write some restful services that send the data to kafka.
If they use MQ you could listen to MQ and send to Kafka.



On Wed, Jul 13, 2016 at 9:31 PM, Luo, Chao  wrote:

> Dear Kafka guys,
>
> I just started to build up a Kafka system two weeks ago. Here I have a 
> question about how to design/implement the producer.
>
> In my system, there are many data providers. I need to collect 
> real-time data from them and store it in a NoSQL database. The problem 
> is that different data providers have their own Java APIs, and they 
> will not use Kafka-client to send data directly to my Kafka servers. 
> So I need to first collect data from them and feed it to the Kafka 
> servers. I guess I need to finish data acquisition in the Producers. 
> My question is that there are a great number of data providers so I 
> also need a lot of producers??? Or is there any more efficient ways to deal 
> with it?
>
> Best,
> Chao
>


Re: Role of Producer

2016-07-13 Thread Snehal Nagmote
Hi Chao ,

To solve this problem , I can think of creating rest api . Your end point
can have one of the parameter as data provider if you want to send it to
different topics based on data provider  .

On backend , when you get data , you can send it to Kafka Topics, using
Kafka Producer at the end.

Thanks,
Snehal



On 13 July 2016 at 13:31, Luo, Chao  wrote:

> Dear Kafka guys,
>
> I just started to build up a Kafka system two weeks ago. Here I have a
> question about how to design/implement the producer.
>
> In my system, there are many data providers. I need to collect real-time
> data from them and store it in a NoSQL database. The problem is that
> different data providers have their own Java APIs, and they will not use
> Kafka-client to send data directly to my Kafka servers. So I need to first
> collect data from them and feed it to the Kafka servers. I guess I need to
> finish data acquisition in the Producers. My question is that there are a
> great number of data providers so I also need a lot of producers??? Or is
> there any more efficient ways to deal with it?
>
> Best,
> Chao
>


Re: Role of Producer

2016-07-13 Thread Michael Freeman
Could you write them a client that uses the Kafka producer?
You could also write some restful services that send the data to kafka.
If they use MQ you could listen to MQ and send to Kafka.



On Wed, Jul 13, 2016 at 9:31 PM, Luo, Chao  wrote:

> Dear Kafka guys,
>
> I just started to build up a Kafka system two weeks ago. Here I have a
> question about how to design/implement the producer.
>
> In my system, there are many data providers. I need to collect real-time
> data from them and store it in a NoSQL database. The problem is that
> different data providers have their own Java APIs, and they will not use
> Kafka-client to send data directly to my Kafka servers. So I need to first
> collect data from them and feed it to the Kafka servers. I guess I need to
> finish data acquisition in the Producers. My question is that there are a
> great number of data providers so I also need a lot of producers??? Or is
> there any more efficient ways to deal with it?
>
> Best,
> Chao
>


Role of Producer

2016-07-13 Thread Luo, Chao
Dear Kafka guys,

I just started to build up a Kafka system two weeks ago. Here I have a question 
about how to design/implement the producer.

In my system, there are many data providers. I need to collect real-time data 
from them and store it in a NoSQL database. The problem is that different data 
providers have their own Java APIs, and they will not use Kafka-client to send 
data directly to my Kafka servers. So I need to first collect data from them 
and feed it to the Kafka servers. I guess I need to finish data acquisition in 
the Producers. My question is that there are a great number of data providers 
so I also need a lot of producers??? Or is there any more efficient ways to 
deal with it?

Best,
Chao


Java 0.9.0.1 Consumer Does not failover

2016-07-13 Thread Michael Freeman
Hi,
I'm running a Kafka cluster with 3 nodes.
I have a topic with a replication factor of 3.
When I stop node 1 running kafka-topics.sh shows me that node 2 and 3 have
successfully failed over the partitions for the topic.

The message producers are still sending messages and I can still consume
them using kafka-console-consumer.sh

However my 0.9.0.1 Java consumer stops consuming messages. ( I thought
maybe it was rebalancing but after an hour nothing)

The consumer logs simply contains lots of "Marking the coordinator
8147683647 dead."

If I start node 1 up the consumers detect this and start to consume
messages again.

On startup of the consumers I can see that all three nodes are listed in
the "bootstrap.servers".

Any idea whats wrong with the Java consumer?

Thanks for any help or pointers in the right direction.

Michael


Re: Consumer Offsets and Open FDs

2016-07-13 Thread Rakesh Vidyadharan
We ran into this as well, and I ended up with the following that works for us.

log.cleaner.dedupe.buffer.size=536870912
log.cleaner.io.buffer.size=2000





On 13/07/2016 14:01, "Lawrence Weikum"  wrote:

>Apologies. Here is the full trace from a broker:
>
>[2016-06-24 09:57:39,881] ERROR [kafka-log-cleaner-thread-0], Error due to  
>(kafka.log.LogCleaner)
>java.lang.IllegalArgumentException: requirement failed: 9730197928 messages in 
>segment __consumer_offsets-36/.log but offset map can fit 
>only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease 
>log.cleaner.threads
>at scala.Predef$.require(Predef.scala:219)
>at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
>at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
>at 
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
>at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
>at kafka.log.Cleaner.clean(LogCleaner.scala:322)
>at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
>at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
>at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
>[2016-06-24 09:57:39,881] INFO [kafka-log-cleaner-thread-0], Stopped  
>(kafka.log.LogCleaner)
>
>
>Is log.cleaner.dedupe.buffer.size a broker setting?  What is a good number to 
>set it to?
>
>
>
>Lawrence Weikum 
>
>
>On 7/13/16, 11:18 AM, "Manikumar Reddy"  wrote:
>
>Can you post the complete error stack trace?   Yes, you need to
>restart the affected
>brokers.
>You can tweak log.cleaner.dedupe.buffer.size, log.cleaner.io.buffer.size
>configs.
>
>Some related JIRAs:
>
>https://issues.apache.org/jira/browse/KAFKA-3587
>https://issues.apache.org/jira/browse/KAFKA-3894
>https://issues.apache.org/jira/browse/KAFKA-3915
>
>On Wed, Jul 13, 2016 at 10:36 PM, Lawrence Weikum 
>wrote:
>
>> Oh interesting. I didn’t know about that log file until now.
>>
>> The only error that has been populated among all brokers showing this
>> behavior is:
>>
>> ERROR [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
>>
>> Then we see many messages like this:
>>
>> INFO Compaction for partition [__consumer_offsets,30] is resumed
>> (kafka.log.LogCleaner)
>> INFO The cleaning for partition [__consumer_offsets,30] is aborted
>> (kafka.log.LogCleaner)
>>
>> Using Visual VM, I do not see any log-cleaner threads in those brokers.  I
>> do see it in the brokers not showing this behavior though.
>>
>> Any idea why the LogCleaner failed?
>>
>> As a temporary fix, should we restart the affected brokers?
>>
>> Thanks again!
>>
>>
>> Lawrence Weikum
>>
>> On 7/13/16, 10:34 AM, "Manikumar Reddy"  wrote:
>>
>> Hi,
>>
>> Are you seeing any errors in log-cleaner.log?  The log-cleaner thread can
>> crash on certain errors.
>>
>> Thanks
>> Manikumar
>>
>> On Wed, Jul 13, 2016 at 9:54 PM, Lawrence Weikum 
>> wrote:
>>
>> > Hello,
>> >
>> > We’re seeing a strange behavior in Kafka 0.9.0.1 which occurs about every
>> > other week.  I’m curious if others have seen it and know of a solution.
>> >
>> > Setup and Scenario:
>> >
>> > -  Brokers initially setup with log compaction turned off
>> >
>> > -  After 30 days, log compaction was turned on
>> >
>> > -  At this time, the number of Open FDs was ~ 30K per broker.
>> >
>> > -  After 2 days, the __consumer_offsets topic was compacted
>> > fully.  Open FDs reduced to ~5K per broker.
>> >
>> > -  Cluster has been under normal load for roughly 7 days.
>> >
>> > -  At the 7 day mark, __consumer_offsets topic seems to have
>> > stopped compacting on two of the brokers, and on those brokers, the FD
>> > count is up to ~25K.
>> >
>> >
>> > We have tried rebalancing the partitions before.  The first time, the
>> > destination broker had compacted the data fine and open FDs were low. The
>> > second time, the destination broker kept the FDs open.
>> >
>> >
>> > In all the broker logs, we’re seeing this messages:
>> > INFO [Group Metadata Manager on Broker 8]: Removed 0 expired offsets in 0
>> > milliseconds. (kafka.coordinator.GroupMetadataManager)
>> >
>> > There are only 4 consumers at the moment on the cluster; one topic with
>> 92
>> > partitions.
>> >
>> > Is there a reason why log compaction may stop working or why the
>> > __consumer_offsets topic would start holding thousands of FDs?
>> >
>> > Thank you all for your help!
>> >
>> > Lawrence Weikum
>> >
>> >
>>
>>
>>
>
>


Re: Contribution : KafkaStreams CEP library

2016-07-13 Thread Guozhang Wang
Added to the eco-system page, thanks for your sharing again!

Cheers,
Guozhang

On Mon, Jul 11, 2016 at 12:40 PM, Florian Hussonnois 
wrote:

> Hi,
>
> It would be very great if you can link my repo. Thank very much.
>
> 2016-07-11 18:26 GMT+02:00 Guozhang Wang :
>
> > Thanks Florian!
> >
> >
> > Do you mind if I link your repo in the Kafka Streams eco-system page?
> >
> >
> > Guozhang
> >
> > On Mon, Jul 11, 2016 at 1:37 AM, Michael Noll 
> > wrote:
> >
> > > Thanks for sharing, Florian!
> > >
> > > -Michael
> > >
> > >
> > > On Fri, Jul 8, 2016 at 6:48 PM, Florian Hussonnois <
> > fhussonn...@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > Since a few weeks I'm working for fun on a CEP library on top of
> > > > KafkaStreams.
> > > > There is some progress and I think my project start to look
> something,
> > or
> > > > at least I hope ;)
> > > >
> > > > https://github.com/fhussonnois/kafkastreams-cep
> > > >
> > > > So I'm pleased to share it with you (I already shared it with dev
> > mailing
> > > > list but I just realised that I've forgotten to add the user list ^^
> ).
> > > >
> > > > Currently, I'm looking to test my library against real use-cases. If
> > some
> > > > of you test it please I would appreciate any feedback.
> > > >
> > > > Any contribution is welcome. I'm sure this project can be improved in
> > > many
> > > > ways.
> > > >
> > > > Thank in advance,
> > > >
> > > > --
> > > > Florian HUSSONNOIS
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> Florian HUSSONNOIS
>



-- 
-- Guozhang


Re: Consumer Offsets and Open FDs

2016-07-13 Thread Lawrence Weikum
Apologies. Here is the full trace from a broker:

[2016-06-24 09:57:39,881] ERROR [kafka-log-cleaner-thread-0], Error due to  
(kafka.log.LogCleaner)
java.lang.IllegalArgumentException: requirement failed: 9730197928 messages in 
segment __consumer_offsets-36/.log but offset map can fit 
only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease 
log.cleaner.threads
at scala.Predef$.require(Predef.scala:219)
at 
kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
at 
kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
at 
scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
at kafka.log.Cleaner.clean(LogCleaner.scala:322)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-06-24 09:57:39,881] INFO [kafka-log-cleaner-thread-0], Stopped  
(kafka.log.LogCleaner)


Is log.cleaner.dedupe.buffer.size a broker setting?  What is a good number to 
set it to?



Lawrence Weikum 


On 7/13/16, 11:18 AM, "Manikumar Reddy"  wrote:

Can you post the complete error stack trace?   Yes, you need to
restart the affected
brokers.
You can tweak log.cleaner.dedupe.buffer.size, log.cleaner.io.buffer.size
configs.

Some related JIRAs:

https://issues.apache.org/jira/browse/KAFKA-3587
https://issues.apache.org/jira/browse/KAFKA-3894
https://issues.apache.org/jira/browse/KAFKA-3915

On Wed, Jul 13, 2016 at 10:36 PM, Lawrence Weikum 
wrote:

> Oh interesting. I didn’t know about that log file until now.
>
> The only error that has been populated among all brokers showing this
> behavior is:
>
> ERROR [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
>
> Then we see many messages like this:
>
> INFO Compaction for partition [__consumer_offsets,30] is resumed
> (kafka.log.LogCleaner)
> INFO The cleaning for partition [__consumer_offsets,30] is aborted
> (kafka.log.LogCleaner)
>
> Using Visual VM, I do not see any log-cleaner threads in those brokers.  I
> do see it in the brokers not showing this behavior though.
>
> Any idea why the LogCleaner failed?
>
> As a temporary fix, should we restart the affected brokers?
>
> Thanks again!
>
>
> Lawrence Weikum
>
> On 7/13/16, 10:34 AM, "Manikumar Reddy"  wrote:
>
> Hi,
>
> Are you seeing any errors in log-cleaner.log?  The log-cleaner thread can
> crash on certain errors.
>
> Thanks
> Manikumar
>
> On Wed, Jul 13, 2016 at 9:54 PM, Lawrence Weikum 
> wrote:
>
> > Hello,
> >
> > We’re seeing a strange behavior in Kafka 0.9.0.1 which occurs about every
> > other week.  I’m curious if others have seen it and know of a solution.
> >
> > Setup and Scenario:
> >
> > -  Brokers initially setup with log compaction turned off
> >
> > -  After 30 days, log compaction was turned on
> >
> > -  At this time, the number of Open FDs was ~ 30K per broker.
> >
> > -  After 2 days, the __consumer_offsets topic was compacted
> > fully.  Open FDs reduced to ~5K per broker.
> >
> > -  Cluster has been under normal load for roughly 7 days.
> >
> > -  At the 7 day mark, __consumer_offsets topic seems to have
> > stopped compacting on two of the brokers, and on those brokers, the FD
> > count is up to ~25K.
> >
> >
> > We have tried rebalancing the partitions before.  The first time, the
> > destination broker had compacted the data fine and open FDs were low. The
> > second time, the destination broker kept the FDs open.
> >
> >
> > In all the broker logs, we’re seeing this messages:
> > INFO [Group Metadata Manager on Broker 8]: Removed 0 expired offsets in 0
> > milliseconds. (kafka.coordinator.GroupMetadataManager)
> >
> > There are only 4 consumers at the moment on the cluster; one topic with
> 92
> > partitions.
> >
> > Is there a reason why log compaction may stop working or why the
> > __consumer_offsets topic would start holding thousands of FDs?
> >
> > Thank you all for your help!
> >
> > Lawrence Weikum
> >
> >
>
>
>




Re: Kafka Streams reducebykey and tumbling window - need final windowed KTable values

2016-07-13 Thread Srikanth
Thanks.

This would be useful in place where we use a key-value store just to
duplicate a KTable for get() operations.
Any rough idea when this is targeted for release?

Its still not clear how to use this for the case this thread was started
for.
Does Kafka Stream keep windows alive forever?
At some point we need to "complete" a window rt? Either based on processing
time or event time + watermark, etc.
How can we tie internal state store query with window completion? i.e, get
the final value.

Srikanth

On Thu, Jul 7, 2016 at 2:05 PM, Eno Thereska  wrote:

> Hi Srikanth, Clive,
>
> Today we just added some example code usage in the KIP after feedback from
> the community. There is code that shows how to access a WindowStore (in
> read-only mode).
>
> Thanks
> Eno
>
>
> > On 7 Jul 2016, at 15:57, Srikanth  wrote:
> >
> > Eno,
> >
> > I was also looking for something similar. To output aggregate value once
> > the window is "complete".
> > I'm not sure getting individual update for an aggregate operator is that
> > useful.
> >
> > With KIP-67, will we have access to Windowed[key]( key + timestamp) and
> > value?
> > Does until() clear this store when time passes?
> >
> > Srikanth
> >
> > On Thu, Jun 30, 2016 at 4:27 AM, Clive Cox  >
> > wrote:
> >
> >> Hi Eno,
> >> I've looked at KIP-67. It looks good but its not clear what calls I
> would
> >> make to do what I presently need: Get access to each windowed store at
> some
> >> time soon after window end time. I can then use the methods specified to
> >> iterate over keys and values. Can you point me to the relevant
> >> method/technique for this?
> >>
> >> Thanks,
> >> Clive
> >>
> >>
> >>On Tuesday, 28 June 2016, 12:47, Eno Thereska <
> eno.there...@gmail.com>
> >> wrote:
> >>
> >>
> >> Hi Clive,
> >>
> >> As promised, here is the link to the KIP that just went out today.
> >> Feedback welcome:
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> >> <
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67:+Queryable+state+for+Kafka+Streams
> >>>
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 27 Jun 2016, at 20:56, Eno Thereska  wrote:
> >>>
> >>> Hi Clive,
> >>>
> >>> We are working on exposing the state store behind a KTable as part of
> >> allowing for queries to the structures currently hidden behind the
> language
> >> (DSL). The KIP should be out today or tomorrow for you to have a look.
> You
> >> can probably do what you need using the low-level processor API but then
> >> you'd lose the benefits of the DSL and would have to maintain your own
> >> structures.
> >>>
> >>> Thanks,
> >>> Eno
> >>>
>  On 26 Jun 2016, at 18:42, Clive Cox 
> >> wrote:
> 
>  Following on from this thread, if I want to iterate over a KTable at
> >> the end of its hopping/tumbling Time Window how can I do this at present
> >> myself? Is there a way to access these structures?
>  If this is not possible it would seem I need to duplicate and manage
> >> something similar to a list of windowed KTables myself which is not
> really
> >> ideal.
>  Thanks for any help,
>  Clive
> 
> 
>  On Monday, 13 June 2016, 16:03, Eno Thereska 
> >> wrote:
> 
> 
>  Hi Clive,
> 
>  For now this optimisation is not present. We're working on it as part
> >> of KIP-63. One manual work-around might be to use a simple Key-value
> store
> >> to deduplicate the final output before sending to the backend. It could
> >> have a simple policy like "output all values at 1 second intervals" or
> >> "output after 10 records have been received".
> 
>  Eno
> 
> 
> > On 13 Jun 2016, at 13:36, Clive Cox 
> >> wrote:
> >
> >
> > Thanks Eno for your comments and references.
> > Perhaps, I can explain what I want to achieve and maybe you can
> >> suggest the correct topology?
> > I want process a stream of events and do aggregation and send to an
> >> analytics backend (Influxdb), so that rather than sending 1000
> points/sec
> >> to the analytics backend, I send a much lower value. I'm only
> interested in
> >> using the processing time of the event so in that respect there are no
> >> "late arriving" events.I was hoping I could use a Tumbling window which
> >> when its end-time had been passed I can send the consolidated
> aggregation
> >> for that window and then throw the Window away.
> >
> > It sounds like from the references you give that this is not possible
> >> at present in Kafka Streams?
> >
> > Thanks,
> > Clive
> >
> >   On Monday, 13 June 2016, 11:32, Eno Thereska <
> >> eno.there...@gmail.com> wrote:
> >
> >
> > Hi Clive,
> >
> > The behaviour you are seeing is indeed correct (though not
> necessarily
> >> optimal in terms of performance as described in this JIRA:
> >> https://issues.apache.org/jira/browse/KAFKA-3101 <
> >> https://issues.apache.org/jira/browse/KAFKA-3101>)
> >
> > The key obs

Re: Building API to make Kafka reactive

2016-07-13 Thread Shekar Tippur
Is there anyway I can get a small working example to start with?

- Shekar

On Wed, Jul 13, 2016 at 10:39 AM, Shekar Tippur  wrote:

> Dean,
>
> I am having trouble getting this to work.
>
> import akka.actor.ActorSystem;
> import akka.kafka.scaladsl.Producer;
> import akka.stream.javadsl.Source;
> import akka.kafka.ProducerSettings;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import org.apache.kafka.common.serialization.StringSerializer;
>
> abstract class KafkaPlProducer {
> protected static ActorSystem system = ActorSystem.create("example");
> protected static ProducerSettings producerSettings =
> ProducerSettings.create(system, new ByteArraySerializer(), new 
> StringSerializer())
> .withBootstrapServers("localhost:9092");
> }
>
> class PlumberSink extends KafkaPlProducer {
> //protected final ActorSystem system = ActorSystem.create("example");
> public static void main(String args[]) {
>
> Source.range(1, 1)
> .map(n -> n.toString()).map(elem -> new 
> ProducerRecord("topic1", elem))
> .to(Producer.plainSink(producerSettings));
> }
> //Source.range(1, 1).map(n -> n.toString()).map(elem -> new 
> ProducerRecord("topic1", 
> elem)).to(Producer.plainSink(producerSettings));
>
> }
>
>
> Here is the exception:
>
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> scala/collection/Iterable
>   at 
> com.companyname.plumber.service.rest.api.internals.KafkaPlProducer.(PlumberSink.java:25)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:264)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:122)
> Caused by: java.lang.ClassNotFoundException: scala.collection.Iterable
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 4 more
>
>
>
> On Sat, Jul 2, 2016 at 9:42 PM, Shekar Tippur  wrote:
>
>> Dean,
>>
>> Thanks a lot for the link. I am going through the documentation.
>>
>> - Shekar
>>
>> On Wed, Jun 29, 2016 at 9:50 AM, Dean Wampler 
>> wrote:
>>
>>> Here's another Reactive API: https://github.com/akka/reactive-kafka
>>>
>>> It was developed by Software Mill  and it's
>>> now
>>> being integrated with Akka .
>>>
>>> dean
>>>
>>> Dean Wampler, Ph.D.
>>> Author: Programming Scala, 2nd Edition
>>>  (O'Reilly)
>>> Lightbend 
>>> @deanwampler 
>>> http://polyglotprogramming.com
>>>
>>> On Wed, Jun 29, 2016 at 11:03 AM, Shekar Tippur 
>>> wrote:
>>>
>>> > Thanks for the suggestion Lohith. Will try that and provide a feedback.
>>> >
>>> > - Shekar
>>> >
>>> > On Tue, Jun 28, 2016 at 11:45 PM, Lohith Samaga M <
>>> > lohith.sam...@mphasis.com
>>> > > wrote:
>>> >
>>> > > Hi Shekar,
>>> > > Alternatively, you could make each stage of your pipeline to
>>> > write
>>> > > to a Cassandra (or other DB) and your API will read from it. With
>>> > Cassandra
>>> > > TTL, the row will be deleted after TTL is passed. No manual cleanup
>>> is
>>> > > required.
>>> > >
>>> > > Best regards / Mit freundlichen Grüßen / Sincères salutations
>>> > > M. Lohith Samaga
>>> > >
>>> > >
>>> > >
>>> > > -Original Message-
>>> > > From: Shekar Tippur [mailto:ctip...@gmail.com]
>>> > > Sent: Wednesday, June 29, 2016 12.10
>>> > > To: users
>>> > > Subject: Building API to make Kafka reactive
>>> > >
>>> > > I am looking at building a reactive api on top of Kafka.
>>> > > This API produces event to Kafka topic. I want to add a unique
>>> session id
>>> > > into the payload.
>>> > > The data gets transformed as it goes through different stages of a
>>> > > pipeline. I want to specify a final topic where I want the api to
>>> know
>>> > that
>>> > > the processing was successful.
>>> > > The API should give different status at each part of the pipeline.
>>> > > At the ingestion, the API responds with "submitted"
>>> > > During the progression, the API returns "in progress"
>>> > > After successful completion, the API returns "Success"
>>> > >
>>> > > Couple of questions:
>>> > > 1. Is this feasible?
>>> > > 2. I was looking at project reactor (https://projectreactor.io)
>>> where
>>> > the
>>> > > docs talk about event bus. I wanted to see if I can implement a
>>> consumer
>>> > > that points to the "end" topic and throws an event into the event
>>> bus.
>>> > > Since I would know the session ID, I c

Re: Building API to make Kafka reactive

2016-07-13 Thread Shekar Tippur
Dean,

I am having trouble getting this to work.

import akka.actor.ActorSystem;
import akka.kafka.scaladsl.Producer;
import akka.stream.javadsl.Source;
import akka.kafka.ProducerSettings;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

abstract class KafkaPlProducer {
protected static ActorSystem system = ActorSystem.create("example");
protected static ProducerSettings producerSettings =
ProducerSettings.create(system, new ByteArraySerializer(),
new StringSerializer())
.withBootstrapServers("localhost:9092");
}

class PlumberSink extends KafkaPlProducer {
//protected final ActorSystem system = ActorSystem.create("example");
public static void main(String args[]) {

Source.range(1, 1)
.map(n -> n.toString()).map(elem -> new
ProducerRecord("topic1", elem))
.to(Producer.plainSink(producerSettings));
}
//Source.range(1, 1).map(n -> n.toString()).map(elem -> new
ProducerRecord("topic1",
elem)).to(Producer.plainSink(producerSettings));

}


Here is the exception:

Exception in thread "main" java.lang.NoClassDefFoundError:
scala/collection/Iterable
at 
com.companyname.plumber.service.rest.api.internals.KafkaPlProducer.(PlumberSink.java:25)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:122)
Caused by: java.lang.ClassNotFoundException: scala.collection.Iterable
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 4 more



On Sat, Jul 2, 2016 at 9:42 PM, Shekar Tippur  wrote:

> Dean,
>
> Thanks a lot for the link. I am going through the documentation.
>
> - Shekar
>
> On Wed, Jun 29, 2016 at 9:50 AM, Dean Wampler 
> wrote:
>
>> Here's another Reactive API: https://github.com/akka/reactive-kafka
>>
>> It was developed by Software Mill  and it's
>> now
>> being integrated with Akka .
>>
>> dean
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>>  (O'Reilly)
>> Lightbend 
>> @deanwampler 
>> http://polyglotprogramming.com
>>
>> On Wed, Jun 29, 2016 at 11:03 AM, Shekar Tippur 
>> wrote:
>>
>> > Thanks for the suggestion Lohith. Will try that and provide a feedback.
>> >
>> > - Shekar
>> >
>> > On Tue, Jun 28, 2016 at 11:45 PM, Lohith Samaga M <
>> > lohith.sam...@mphasis.com
>> > > wrote:
>> >
>> > > Hi Shekar,
>> > > Alternatively, you could make each stage of your pipeline to
>> > write
>> > > to a Cassandra (or other DB) and your API will read from it. With
>> > Cassandra
>> > > TTL, the row will be deleted after TTL is passed. No manual cleanup is
>> > > required.
>> > >
>> > > Best regards / Mit freundlichen Grüßen / Sincères salutations
>> > > M. Lohith Samaga
>> > >
>> > >
>> > >
>> > > -Original Message-
>> > > From: Shekar Tippur [mailto:ctip...@gmail.com]
>> > > Sent: Wednesday, June 29, 2016 12.10
>> > > To: users
>> > > Subject: Building API to make Kafka reactive
>> > >
>> > > I am looking at building a reactive api on top of Kafka.
>> > > This API produces event to Kafka topic. I want to add a unique
>> session id
>> > > into the payload.
>> > > The data gets transformed as it goes through different stages of a
>> > > pipeline. I want to specify a final topic where I want the api to know
>> > that
>> > > the processing was successful.
>> > > The API should give different status at each part of the pipeline.
>> > > At the ingestion, the API responds with "submitted"
>> > > During the progression, the API returns "in progress"
>> > > After successful completion, the API returns "Success"
>> > >
>> > > Couple of questions:
>> > > 1. Is this feasible?
>> > > 2. I was looking at project reactor (https://projectreactor.io) where
>> > the
>> > > docs talk about event bus. I wanted to see if I can implement a
>> consumer
>> > > that points to the "end" topic and throws an event into the event bus.
>> > > Since I would know the session ID, I can process the request
>> accordingly.
>> > >
>> > > Appreciate your inputs.
>> > >
>> > > - Shekar
>> > > Information transmitted by this e-mail is proprietary to Mphasis, its
>> > > associated companies and/ or its customers and is intended
>> > > for use only by the individual or entity to which it is addressed,

Re: Consumer Offsets and Open FDs

2016-07-13 Thread Manikumar Reddy
Can you post the complete error stack trace?   Yes, you need to
restart the affected
brokers.
You can tweak log.cleaner.dedupe.buffer.size, log.cleaner.io.buffer.size
configs.

Some related JIRAs:

https://issues.apache.org/jira/browse/KAFKA-3587
https://issues.apache.org/jira/browse/KAFKA-3894
https://issues.apache.org/jira/browse/KAFKA-3915

On Wed, Jul 13, 2016 at 10:36 PM, Lawrence Weikum 
wrote:

> Oh interesting. I didn’t know about that log file until now.
>
> The only error that has been populated among all brokers showing this
> behavior is:
>
> ERROR [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
>
> Then we see many messages like this:
>
> INFO Compaction for partition [__consumer_offsets,30] is resumed
> (kafka.log.LogCleaner)
> INFO The cleaning for partition [__consumer_offsets,30] is aborted
> (kafka.log.LogCleaner)
>
> Using Visual VM, I do not see any log-cleaner threads in those brokers.  I
> do see it in the brokers not showing this behavior though.
>
> Any idea why the LogCleaner failed?
>
> As a temporary fix, should we restart the affected brokers?
>
> Thanks again!
>
>
> Lawrence Weikum
>
> On 7/13/16, 10:34 AM, "Manikumar Reddy"  wrote:
>
> Hi,
>
> Are you seeing any errors in log-cleaner.log?  The log-cleaner thread can
> crash on certain errors.
>
> Thanks
> Manikumar
>
> On Wed, Jul 13, 2016 at 9:54 PM, Lawrence Weikum 
> wrote:
>
> > Hello,
> >
> > We’re seeing a strange behavior in Kafka 0.9.0.1 which occurs about every
> > other week.  I’m curious if others have seen it and know of a solution.
> >
> > Setup and Scenario:
> >
> > -  Brokers initially setup with log compaction turned off
> >
> > -  After 30 days, log compaction was turned on
> >
> > -  At this time, the number of Open FDs was ~ 30K per broker.
> >
> > -  After 2 days, the __consumer_offsets topic was compacted
> > fully.  Open FDs reduced to ~5K per broker.
> >
> > -  Cluster has been under normal load for roughly 7 days.
> >
> > -  At the 7 day mark, __consumer_offsets topic seems to have
> > stopped compacting on two of the brokers, and on those brokers, the FD
> > count is up to ~25K.
> >
> >
> > We have tried rebalancing the partitions before.  The first time, the
> > destination broker had compacted the data fine and open FDs were low. The
> > second time, the destination broker kept the FDs open.
> >
> >
> > In all the broker logs, we’re seeing this messages:
> > INFO [Group Metadata Manager on Broker 8]: Removed 0 expired offsets in 0
> > milliseconds. (kafka.coordinator.GroupMetadataManager)
> >
> > There are only 4 consumers at the moment on the cluster; one topic with
> 92
> > partitions.
> >
> > Is there a reason why log compaction may stop working or why the
> > __consumer_offsets topic would start holding thousands of FDs?
> >
> > Thank you all for your help!
> >
> > Lawrence Weikum
> >
> >
>
>
>


Re: Consumer Offsets and Open FDs

2016-07-13 Thread Lawrence Weikum
Oh interesting. I didn’t know about that log file until now.

The only error that has been populated among all brokers showing this behavior 
is:

ERROR [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)

Then we see many messages like this:

INFO Compaction for partition [__consumer_offsets,30] is resumed 
(kafka.log.LogCleaner)
INFO The cleaning for partition [__consumer_offsets,30] is aborted 
(kafka.log.LogCleaner)

Using Visual VM, I do not see any log-cleaner threads in those brokers.  I do 
see it in the brokers not showing this behavior though.

Any idea why the LogCleaner failed?

As a temporary fix, should we restart the affected brokers?

Thanks again!


Lawrence Weikum 

On 7/13/16, 10:34 AM, "Manikumar Reddy"  wrote:

Hi,

Are you seeing any errors in log-cleaner.log?  The log-cleaner thread can
crash on certain errors.

Thanks
Manikumar

On Wed, Jul 13, 2016 at 9:54 PM, Lawrence Weikum 
wrote:

> Hello,
>
> We’re seeing a strange behavior in Kafka 0.9.0.1 which occurs about every
> other week.  I’m curious if others have seen it and know of a solution.
>
> Setup and Scenario:
>
> -  Brokers initially setup with log compaction turned off
>
> -  After 30 days, log compaction was turned on
>
> -  At this time, the number of Open FDs was ~ 30K per broker.
>
> -  After 2 days, the __consumer_offsets topic was compacted
> fully.  Open FDs reduced to ~5K per broker.
>
> -  Cluster has been under normal load for roughly 7 days.
>
> -  At the 7 day mark, __consumer_offsets topic seems to have
> stopped compacting on two of the brokers, and on those brokers, the FD
> count is up to ~25K.
>
>
> We have tried rebalancing the partitions before.  The first time, the
> destination broker had compacted the data fine and open FDs were low. The
> second time, the destination broker kept the FDs open.
>
>
> In all the broker logs, we’re seeing this messages:
> INFO [Group Metadata Manager on Broker 8]: Removed 0 expired offsets in 0
> milliseconds. (kafka.coordinator.GroupMetadataManager)
>
> There are only 4 consumers at the moment on the cluster; one topic with 92
> partitions.
>
> Is there a reason why log compaction may stop working or why the
> __consumer_offsets topic would start holding thousands of FDs?
>
> Thank you all for your help!
>
> Lawrence Weikum
>
>




Re: Consumer Offsets and Open FDs

2016-07-13 Thread Manikumar Reddy
Hi,

Are you seeing any errors in log-cleaner.log?  The log-cleaner thread can
crash on certain errors.

Thanks
Manikumar

On Wed, Jul 13, 2016 at 9:54 PM, Lawrence Weikum 
wrote:

> Hello,
>
> We’re seeing a strange behavior in Kafka 0.9.0.1 which occurs about every
> other week.  I’m curious if others have seen it and know of a solution.
>
> Setup and Scenario:
>
> -  Brokers initially setup with log compaction turned off
>
> -  After 30 days, log compaction was turned on
>
> -  At this time, the number of Open FDs was ~ 30K per broker.
>
> -  After 2 days, the __consumer_offsets topic was compacted
> fully.  Open FDs reduced to ~5K per broker.
>
> -  Cluster has been under normal load for roughly 7 days.
>
> -  At the 7 day mark, __consumer_offsets topic seems to have
> stopped compacting on two of the brokers, and on those brokers, the FD
> count is up to ~25K.
>
>
> We have tried rebalancing the partitions before.  The first time, the
> destination broker had compacted the data fine and open FDs were low. The
> second time, the destination broker kept the FDs open.
>
>
> In all the broker logs, we’re seeing this messages:
> INFO [Group Metadata Manager on Broker 8]: Removed 0 expired offsets in 0
> milliseconds. (kafka.coordinator.GroupMetadataManager)
>
> There are only 4 consumers at the moment on the cluster; one topic with 92
> partitions.
>
> Is there a reason why log compaction may stop working or why the
> __consumer_offsets topic would start holding thousands of FDs?
>
> Thank you all for your help!
>
> Lawrence Weikum
>
>


Consumer Offsets and Open FDs

2016-07-13 Thread Lawrence Weikum
Hello,

We’re seeing a strange behavior in Kafka 0.9.0.1 which occurs about every other 
week.  I’m curious if others have seen it and know of a solution.

Setup and Scenario:

-  Brokers initially setup with log compaction turned off

-  After 30 days, log compaction was turned on

-  At this time, the number of Open FDs was ~ 30K per broker.

-  After 2 days, the __consumer_offsets topic was compacted fully.  
Open FDs reduced to ~5K per broker.

-  Cluster has been under normal load for roughly 7 days.

-  At the 7 day mark, __consumer_offsets topic seems to have stopped 
compacting on two of the brokers, and on those brokers, the FD count is up to 
~25K.


We have tried rebalancing the partitions before.  The first time, the 
destination broker had compacted the data fine and open FDs were low. The 
second time, the destination broker kept the FDs open.


In all the broker logs, we’re seeing this messages:
INFO [Group Metadata Manager on Broker 8]: Removed 0 expired offsets in 0 
milliseconds. (kafka.coordinator.GroupMetadataManager)

There are only 4 consumers at the moment on the cluster; one topic with 92 
partitions.

Is there a reason why log compaction may stop working or why the 
__consumer_offsets topic would start holding thousands of FDs?

Thank you all for your help!

Lawrence Weikum



Read all record from a Topic.

2016-07-13 Thread gibe
Hi,


I'm using a compacted Kafka Topic to save the state of my application. When the 
application crashes/restarts I can restore its state by reading the Kafka topic.



However I need to read it completely, especially up to most recent record, to 
be sure to restore all data.



Is there a standard way to to that? I've checked the Kafka streams code and 
found that the class ProcessorStateManager seems to be doing something similar.



It first gets the last offset by doing:

// calculate the end offset of the partition // TODO: this is a bit hacky to 
first seek then position to get the end offset

restoreConsumer.seekToEnd(singleton(storePartition));

long endOffset = restoreConsumer.position(storePartition); 



Then it polls the records until reaching the endoffset (there is also an other 
limit but I think it is related to an other use case).



I guess it works, but the TODO message makes me wonder if it is a good solution 
and if it will continue to work in future releases.



Thanks for your help,



Jean-Baptiste







Re: Need a help in understanding __consumer_offsets topic creation in Kafka Cluster

2016-07-13 Thread Prasannalakshmi Sugumaran
Hi,



We are using Kafka 0.9.0.2, and by default log cleaner is not enabled. When
we enable the log cleaner, internal topic “__consumer_offsets” (around 1TB
in size)starts compaction, and during compaction we are unable to
consume/produce messages. Also, consumer groups failed in leader election.



Partitions – 6

Kafka nodes – 3

Zookeeper – 3

Replication factor – 3



- Prasanna


Re: NetFlow metrics to Kafka

2016-07-13 Thread Michael Noll
Mathieu,

yes, this is possible.  In a past project of mine we have been doing this,
though I wasn't directly involved with coding the Cisco-Kafka part.  As far
as I know there aren't ready-to-use Netflow connectors available (for Kafka
Connect), so you most probably have to write your own connector and/or
application to bridge Cisco and Kafka.

-Michael


On Tue, Jul 12, 2016 at 4:50 PM, OZERAY MATHIEU 
wrote:

> Hello,
>
>
> I have a question about Kafka.
>
> Actually, I produce NetFlow metrics on my Cisco router. I want know if
> it's possible to send NetFlow metrics to Kafka broker to resend this in
> Logstash server ?
>
> Thanks for your answer.
>
> Have a nice day.
>
>
> Mathieu OZERAY
>



-- 


*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
*


Re: Kafka Consumer Group Id bug?

2016-07-13 Thread Spico Florin
Hello!
  For me it seems that rebalance is the cause that some of the messages are
consumed by either one consumer or another. If you are using random client
id, then it could happen that during the rebalance  a client id will get a
"lower" position than the  consumer that was previously consumed the
messages, and thus the new consumer will be the first that consumes, and
the other one will be in standby.

Kafka rebalance algorithm orders alphabetically the client id per each
groupid and allocates the partitions based on this order.

I hope that it helps.
Regards,

Florin

On Wed, Jul 13, 2016 at 8:59 AM, Gerard Klijs 
wrote:

> Are you sure the topic itself has indeed 1 partition?
> If so the only partition should be matched to either one till some
> error/rebalance occurs, does this indeed happen (a lot)?
>
> On Wed, Jul 13, 2016 at 7:19 AM BYEONG-GI KIM  wrote:
>
> > Hello.
> >
> > I'm not sure whether it's a bug or not, but here is a something wrong;
> >
> > I set 2 consumer apps that have same consumer group id, and partition has
> > been set to 1 on my Kafka Broker.
> >
> > In theory, the messages on the Kafka Broker must be consumed either one,
> > which means it must not be consumed at both of them. But the messages
> were
> > sometimes consumed to both of them.
> >
> > I found the definition of the partition at the Kafka Website:
> > *"Kafka only provides a total order over messages within a partition, not
> > between different partitions in a topic. Per-partition ordering combined
> > with the ability to partition data by key is sufficient for most
> > applications. However, if you require a total order over messages this
> can
> > be achieved with a topic that has only one partition, though this will
> mean
> > only one consumer process per consumer group."*
> >
> > What's wrong with my setting?
> >
> > Regards
> >
> > KIM
> >
>