Re: handling generics in Kafka Scala

2016-09-08 Thread Dean Wampler
Ah, yes. Scala interprets Java collection types, like ArrayList[T], as
*invariant*, which means that you can't use a declared ArrayList[String]
where you expect an ArrayList[Any], which would be an example of *covariance.
*(This is due to Java's flawed way of declaring generics, where the person
*declaring* the collection doesn't have the ability to specify variance
behavior. The *user* decides if a declaration is invariant, covariant, or
contravariant.)

Example using the Scala interpreter:

scala> import java.util.ArrayList
import java.util.ArrayList

scala> var ala: ArrayList[Any] = null   // var so we can
attempt assignments.
ala: java.util.ArrayList[Any] = null

scala> ala = new ArrayList[String]()   // rejected. ala must be
assigned an ArrayList[Any]
:12: error: type mismatch;
 found   : java.util.ArrayList[String]
 required: java.util.ArrayList[Any]
Note: String <: Any, but Java-defined class ArrayList is invariant in type
E.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
   ala = new ArrayList[String]()
 ^

scala> val als = new ArrayList[String]()  // it doesn't work to
declare an ArrayList[String] then try to assign to ala
als: java.util.ArrayList[String] = []

scala> ala = als
:13: error: type mismatch;
 found   : java.util.ArrayList[String]
 required: java.util.ArrayList[Any]
Note: String <: Any, but Java-defined class ArrayList is invariant in type
E.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
   ala = als
 ^

scala> ala = new ArrayList[Any]()   // can only assign an
ArrayList[Any] instance.
ala: java.util.ArrayList[Any] = []

// *** BUT YOU CAN ASSIGN STRINGS, INTS, ETC. TO THE ARRAY
ITSELF 

scala> ala.add(1)
res3: Boolean = true

scala> ala.add("two")
res4: Boolean = true

scala> ala.add(3.3)
res5: Boolean = true

scala> ala
res6: java.util.ArrayList[Any] = [1, two, 3.3]

Note that the type of ala hasn't changed.

Contrast with Scala collections like Seq (List), which is covariant:

scala> var sa: Seq[Any] = null
sa: Seq[Any] = null

scala> sa = List.empty[String]
sa: Seq[Any] = List()

scala> sa = List(1, "two", 3.3)
sa: Seq[Any] = List(1, two, 3.3)

scala> val list = List(1, "two", 3.3)
list: List[Any] = List(1, two, 3.3)

scala> sa = list
sa: Seq[Any] = List(1, two, 3.3)

Note that the type of "sa" doesn't change, even when it's actually pointing
to a subclass that's covariant.


For completeness, there's also *contravariance; *if Foo[T] is
contravariant, then a Foo[String] would be a superclass of Foo[Any]. This
is less common and harder to understand, but an important example is the
argument types for the function types, like Function2[-Arg1, -Arg2,
+Return], e.g.,

scala> var fisa: Function2[Int,String,Any] = null  // a function that
takes Int and String args and returns an Any
fisa: (Int, String) => Any = null

// A function that takes two Any arguments (note that Any is a supertype of
both Int and String), and returns String

scala> val faas: Function2[Any,Any,String] = (a1:Any, a2:Any) =>
a1.toString + a2.toString
faas: (Any, Any) => String = 

scala> fisa = faas  // MIND
BLOWN!!
fisa: (Int, String) => Any = 

scala> fisa(1, "two")
res7: Any = 1two

Why does this work. The Liskov Substitution Principle is really the
correct, technical definition of what we've called the "is a" relationship
in OOP. Fisa defines a contract; what ever function you actually use here,
it must be able to accept an Int and String, and it must guarantee to
return an Any (that's easy). Faas satisfies this contract. It can not only
accept an Int and String, it can accept two Anys. That is, it is more
tolerant to what you pass it. It returns a String only, but that's okay
because the contract only requires that we return an Any.

This is probably wy more information than you wanted to here ;) I cover
this in great depth in Programming Scala, 2nd Edition
 (O'Reilly), because so
few people, even with years of Java or other OO experience understand this
supposedly-core OO principle.

Back to your problem. The upshot is that you can't use the more careful
typing rules you tried. You're stuck with the declarations that can't
enforce the same behavior you want. However, I would do the following:

// trait Protocol defines a protocol for creating stuff of type K
// No + or -, so Protocol is invariant in K
trait Protocol[K] {
  def createStuff(): K
}

import java.util.ArrayList

// KCALB explicitly returns ArrayList[Byte]
object KCALB extends Protocol[ArrayList[Byte]] {
  def createStuff(): ArrayList[Byte] = new ArrayList[Byte]()
}

// KCLL explicitly returns List[Long]
object KCLL extends Protocol[List[Long]] {
  def createStuff(): List[Long] = List.empty[Long]
}

val kcalb = KCALB.createStuff()// kcalb: java.util.ArrayList[Byte] = []
val k

How to decommission a broker so the controller doesn't return it in the list of known brokers?

2016-09-08 Thread Jeff Widman
How do I permanently remove a broker from a Kafka cluster?

Scenario:

I have a stable cluster of 3 brokers. I temporarily added a fourth broker
that successfully joined the cluster. The controller returned metadata
indicating this broker was part of the cluster.

However, I never rebalanced partitions onto this broker, so this broker #4
was never actually used, didn't join any replica lists, ISRs, etc.

I later decided to remove this unused broker from the cluster. I shutdown
the broker successfully and Zookeeper /broker/ids no longer lists broker #4.

However, when my application code connects to any Kafka broker and fetches
metadata, I get a broker list that includes this deleted broker.

How do I indicate to the cluster that this broker has been permanently
removed from the cluster and not just a transient downtime?

Additionally, what's happening under the covers that causes this?

I'm guessing that when I connect to a broker and ask for metadata, the
broker checks its local cache for the controller ID, contacts the broker
and asks it for the list of all brokers. Then the controller checks it's
cached list of brokers and returns the list of all brokers known to have
belonged to the cluster at any point in time.

I'm guessing this happens because it's not certain if the dead broker is
permanently removed or just transient downtime. So I'm thinking I just need
to indicate to the controller that it needs to reset it's list of known
cluster brokers to the known live brokers in zookeeper. But would not be
surprised if something in my mental model is incorrect.

This is for Kafka 0.8.2. I am planning to upgrade to 0.10 in the
not-to-distant future, so if 0.10 handles this differently, I'm also
curious about that.


-- 

*Jeff Widman*
jeffwidman.com  | 740-WIDMAN-J (943-6265)
<><


RE: handling generics in Kafka Scala

2016-09-08 Thread Martin Gainty
yes that function compiled but everytime I implement createNewConsumer() scala 
compiler wants to downcast 
[java.util.ArrayList[Byte]],java.util.ArrayList[Byte]] to wildcard ?
  consumer = createNewConsumer() //getConsumer()
[ERROR]  found   : 
org.apache.kafka.clients.consumer.KafkaConsumer[java.util.ArrayList[Byte],java.util.ArrayList[Byte]]
[ERROR]  required: org.apache.kafka.clients.consumer.KafkaConsumer[K,V]
[ERROR] Note: java.util.ArrayList[Byte] >: K, but Java-defined class 
KafkaConsumer is invariant in type K.
[ERROR] You may wish to investigate a wildcard type such as `_ >: K`. (SLS 
3.2.10)
[ERROR] Note: java.util.ArrayList[Byte] >: V, but Java-defined class 
KafkaConsumer is invariant in type V.
[ERROR] You may wish to investigate a wildcard type such as `_ >: V`. (SLS 
3.2.10)
can scala actually handle this implementation or should I convert to to normal 
Java and avoid the headache?
Martin 
__ 



From: mgai...@hotmail.com
To: mathieu.fenn...@replicon.com; users@kafka.apache.org
Subject: RE: handling generics in Kafka Scala
Date: Tue, 30 Aug 2016 23:00:29 -0400




noob with Scala so Im looking for an experienced answer
ConsumerGroupCommand.scala
//private def createNewConsumer(): KafkaConsumer[String, String] = {//private 
def createNewConsumer(): KafkaConsumer[K extends java.util.ArrayList[Byte],V 
extends java.util.ArrayList[Byte]] = {private def createNewConsumer(): 
KafkaConsumer[K <: java.util.ArrayList[Byte],V <: java.util.ArrayList[Byte]] = 
{  val properties = new java.util.Properties()  val deserializer = (new 
StringDeserializer).getClass.getName  val brokerUrl = 
opts.options.valueOf(opts.bootstrapServerOpt)  
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)  
properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
opts.options.valueOf(opts.groupOpt))  
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")  
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "3")  
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)  
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
  if (opts.options.has(opts.commandConfigOpt)) 
properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))) 
 new KafkaConsumer(properties).asInstanceOf[KafkaConsumer[K,V]]}
scala-compiler displays:
[ERROR] 
\kafka\kafka-trunk\core\src\main\scala\kafka\admin\ConsumerGroupCommand.scala:309:
 error: ']' expected but '<:' found.[ERROR] private def 
createNewConsumer(): KafkaConsumer[? <: java.util.ArrayList[Byte],? <: 
java.util.ArrayList[Byte]] = {[ERROR]   
   ^[ERROR] 
\kafka\kafka-trunk\core\src\main\scala\kafka\admin\ConsumerGroupCommand.scala:309:
 error: '=' expected but ',' found.[ERROR] private def createNewConsumer(): 
KafkaConsumer[? <: java.util.ArrayList[Byte],? <: java.util.ArrayList[Byte]] = 
{[ERROR]
  ^[ERROR] 
\kafka\kafka-trunk\core\src\main\scala\kafka\admin\ConsumerGroupCommand.scala:322:
 error: illegal start of simple expression
i want 2 datatype parameter types extending java.util.ArrayList in 
regular java this would be:
public KafkaConsumer,V extends 
java.util.ArrayList>  createNewConsumer() {}
how do I setup a function or class declaration K, V whose parameter datatype 
extends java.util.ArrayList ?
Martin 
__ 



> From: mathieu.fenn...@replicon.com
> Date: Wed, 17 Aug 2016 18:06:38 -0600
> Subject: Re: DLL Hell
> To: mgai...@hotmail.com
> 
> Hi Martin,
> 
> I'm sorry, this is way outside my Kafka knowledge.  I'm just a new
> Kafka user who wanted to help with your Windows questions because I
> had just faced the same hurdle. :-)  Wish I could help, but I wouldn't
> know where to start with this.
> 
> Mathieu
> 
> 
> On Wed, Aug 17, 2016 at 6:00 PM, Martin Gainty  wrote:
> > Hi Matthieu
> > Many Thanks for attaching the binary
> >
> > running scala->java generator plugin I see:
> >
> > [ERROR]
> > C:\Maven-plugin\kafka\kafka-trunk\core\src\main\scala\kafka\admin\AdminUtils.scala:639:
> > error: type PartitionMetadata is not a member of object
> > org.apache.kafka.common.requests.MetadataResponse
> >
> > yet when I look at org.apache.kafka.common.requests.MetadataResponse.java I
> > see inner class
> >
> > public static class PartitionMetadata {
> >
> > inner static java classes are not visible to the converter for some reason
> > the workaround seems to be birth inner static classes (e.g.
> > PartitionMetadata)
> > treating inner class as standalone works
> >
> > Advice?
> > Martin
> > __
> >
> >
> >
> >
> > 
> > From: mathieu.fenn...@replicon.com
> > Date: Tue, 16 Aug 2016 08:04:52 -0600
> > Subject: Re: DLL Hell
> > To: mgai...@hotmail.com

RE: handling generics in Kafka Scala

2016-09-08 Thread Martin Gainty



> From: deanwamp...@gmail.com
> Date: Wed, 31 Aug 2016 10:53:49 -0500
> Subject: Re: handling generics in Kafka Scala
> To: users@kafka.apache.org
> 
> Okay, the type parameters with the variances need to be after the method
> name, like this:
> 
> private def createNewConsumer[K <: java.util.ArrayList[Byte],V <:
> java.util.ArrayList[Byte]](): KafkaConsumer[K,V] = {...}
> 
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
>  (O'Reilly)
> Lightbend 
> @deanwampler 
> http://polyglotprogramming.com
> 
> On Wed, Aug 31, 2016 at 8:08 AM, Martin Gainty  wrote:
> 
> > supposedly gmail wont strip \n so we'll try again with non-html mail
> > composer
> >
> >  noob with Scala so Im looking for an experienced answer
> >
> >  ConsumerGroupCommand.scala
> >
> >  //private def createNewConsumer(): KafkaConsumer[String, String] = {
> >  //private def createNewConsumer(): KafkaConsumer[K extends
> > // java.util.ArrayList[Byte],V extends java.util.ArrayList[Byte]] = {
> >
> >  private def createNewConsumer(): KafkaConsumer[K <:
> >  java.util.ArrayList[Byte],V <: java.util.ArrayList[Byte]] = {
> >
> >val properties = new java.util.Properties()
> >
> >val deserializer = (new StringDeserializer).getClass.getName
> >
> >val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt)
> >
> >  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
> >
> >  properties.put(ConsumerConfig.GROUP_ID_CONFIG,opts.options.
> > valueOf(opts.groupOpt))
> >
> >properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
> >
> >properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "3")
> >
> >properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> >  deserializer)
> >
> >   properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> >  deserializer)
> >
> >if (opts.options.has(opts.commandConfigOpt))
> >  properties.putAll(Utils.loadProps(opts.options.
> > valueOf(opts.commandConfigOpt)))
> >
> >new KafkaConsumer(properties).asInstanceOf[KafkaConsumer[K,V]]
> >  }
> >
> > scala-compiler displays:
> >  [ERROR] \kafka\kafka-trunk\core\src\main\scala\kafka\admin\
> > ConsumerGroupCommand.scala:309:
> >  error: ']' expected but '<:' found.
> >
> > [ERROR] private def createNewConsumer(): KafkaConsumer[? <:
> >  java.util.ArrayList[Byte],? <: java.util.ArrayList[Byte]] = {
> >  [ERROR]  ^
> >
> >  [ERROR]
> >  \kafka\kafka-trunk\core\src\main\scala\kafka\admin\
> > ConsumerGroupCommand.scala:309:
> >  error: '=' expected but ',' found.
> >
> >  [ERROR] private def createNewConsumer(): KafkaConsumer[? <:
> >  java.util.ArrayList[Byte],? <: java.util.ArrayList[Byte]] = {
> >  [ERROR]
> >^
> >
> >  [ERROR]
> >  \kafka\kafka-trunk\core\src\main\scala\kafka\admin\
> > ConsumerGroupCommand.scala:322:
> >  error: illegal start of simple expression
> >
> >  i need 2 datatype parameter types extending java.util.ArrayList
> >  in regular java this would be:
> >
> > public KafkaConsumer,V extends
> >  java.util.ArrayList>  createNewConsumer() { }
> >
> >  this works in java but
> >  how do I setup a function or class declaration K, V whose parameter
> >  datatype extends java.util.ArrayList ?
> >
> >  Martin
> >
> > >>
> > >> From: mgai...@hotmail.com
> > >> To: mathieu.fenn...@replicon.com; users@kafka.apache.org
> > >> Subject: RE: handling generics in Kafka Scala
> > >> Date: Tue, 30 Aug 2016 23:00:29 -0400
> > >>
> > >>
> > >>
> > >>
> > >> noob with Scala so Im looking for an experienced answer
> > >> ConsumerGroupCommand.scala
> > >> //private def createNewConsumer(): KafkaConsumer[String, String] =
> > >> {//private def createNewConsumer(): KafkaConsumer[K extends
> > >> java.util.ArrayList[Byte],V extends java.util.ArrayList[Byte]] = {
> > >> private def createNewConsumer(): KafkaConsumer[K <:
> > >> java.util.ArrayList[Byte],V <: java.util.ArrayList[Byte]] = {  val
> > >> properties = new java.util.Properties()  val deserializer = (new
> > >> StringDeserializer).getClass.getName  val brokerUrl =
> > >> opts.options.valueOf(opts.bootstrapServerOpt)
> > >> properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
> > >> properties.put(ConsumerConfig.GROUP_ID_CONFIG,
> > >> opts.options.valueOf(opts.groupOpt))
> > >> properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
> > >> properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "3")
> > >> properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> > >> deserializer)
> > >>properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> > >> deserializer)  if (opts.options.has(opts.commandConfigOpt))
> > >> properties.putAll(Utils.loadProps(opts.options.
> > valueOf(opts.commandConfigOpt)))
> > >>  new KafkaConsumer(properties).asIn

enhancing KStream DSL

2016-09-08 Thread Ara Ebrahimi
Let’s say I have this:


KStream[] branches = allRecords
.branch(
(imsi, callRecord) -> 
"VOICE".equalsIgnoreCase(callRecord.getCallCommType()),
(imsi, callRecord) -> 
"DATA".equalsIgnoreCase(callRecord.getCallCommType()),
(imsi, callRecord) -> true
);
KStream callRecords = branches[0];
KStream dataRecords = branches[1];
KStream callRecordCounter = branches[2];

callRecordCounter
.map((imsi, callRecord) -> new KeyValue<>("", ""))
.countByKey(
UnlimitedWindows.of("counter-window"),
stringSerde
)
.print();

Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1 if data 
is DATA. Branch 2 is supposed to get triggered regardless of type all the type 
so that then I can count stuff for a time window. BUT the problem is branch is 
implemented like this:

private class KStreamBranchProcessor extends AbstractProcessor {
@Override
public void process(K key, V value) {
for (int i = 0; i < predicates.length; i++) {
if (predicates[i].test(key, value)) {
// use forward with childIndex here and then break the loop
// so that no record is going to be piped to multiple streams
context().forward(key, value, i);
break;
}
}
}
}

Note the break. So the counter branch is never reached. I’d like to change the 
behavior of branch so that all predicates are checked and no break happens, in 
say a branchAll() method. What’s the easiest way to this functionality to the 
DSL? I tried process() but it doesn’t return KStream.

Ara.






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: KIP-33 Opt out from Time Based indexing

2016-09-08 Thread Jan Filipiak

Hi Jun,

thanks a lot for the hint, Ill check it out when I get a free minute!

Best Jan

On 07.09.2016 00:35, Jun Rao wrote:

Jan,

For the time rolling issue, Jiangjie has committed a fix (
https://issues.apache.org/jira/browse/KAFKA-4099) to trunk. Perhaps you can
help test out trunk and see if there are any other issues related to
time-based index?

Thanks,

Jun

On Mon, Sep 5, 2016 at 11:52 PM, Jan Filipiak 
wrote:


Hi Jun,

sorry for the late reply. Regarding B, my main concern was just complexity
of understanding what's going on.
As you can see it took me probably some 2 days or so, to fully grab all
the details in the implementation and what
the impacts are. Usually I prefer to turn things I don't use off, so I
don't have to bother. Log Append time will work for me.

Rolling logs was my main concern. The producer can specify the timestamp
and we use epoch inside the message, I'd bet money,
people in the company would have put this epoch also in the produce
record. => rollings logs as the broker thinks its millis.
So that would probably have caused us at least one outage if a big
producer had upgraded and done this, IMO likely mistake.

Id just hoped for a more obvious kill-switch, so I didn’t need to bother
that much.

Best Jan





On 29.08.2016 19:36, Jun Rao wrote:


Jan,

For the usefulness of time index, it's ok if you don't plan to use it.
However, I do think there are other people who will want to use it. Fixing
an application bug always requires some additional work. Intuitively,
being
able to seek back to a particular point of time for replay is going to be
much more efficient than always replaying from the very beginning,
especially when the log is retained for a long period of time. Sure, if
you
want to have more confidence, you want to rewind a bit conservatively. But
being able to rewind an extra hour makes a big difference from having to
rewind all to way to 7 days or however long the retention time is.

For the OffsetRequest, I actually agree with you that it's useful. People
can use that to find the first and the last offset and the offset based on
a specific point in time. The part that's a bit awkward with OffsetRequest
is that it's based on the last modified time of the log segment, which
makes it imprecise (precision is at the segment level, not message level)
and non-deterministic (last modified time may change). Another awkwardness
is that it supports returning a list of offsets after a specified
timestamp. We did that simply because timestamp was only at the segment
level then. So, our plan is to replace OffsetRequest with a new one. It
will give you the same functionality: find the first and the last offset
and the offset based on a specific point in time. It will just be better
since it's more precise and more deterministic. For your use case, it
seems
that you don't care about message creation time. Then, it's possible for
you to configure the broker with the log append time. Whether this should
be default at the Kafka level is debatable, but it won't prevent your use
case.

For your suggesting on refactoring, I still want to understand how
necessary it is. Your main concerns so far seem to be.
(a) Impact on rolling log segments.
(b) Time-based index is not useful for me.

Item (a) is a good point. Thanks for that. We will fix it. Item (b), I
have
given my view on this above. Are there any other things that you think
that
having a time-based index will hurt?

Thanks,

Jun

On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak 
wrote:

Hi Jun,

thanks for taking the time to answer on such a detailed level. You are
right Log.fetchOffsetByTimestamp works, the comment is just confusing
"// Get all the segments whose largest timestamp is smaller than target
timestamp" wich is apparently is not what takeWhile does (I am more on
the Java end of things, so I relied on the comment).

Regarding the frequent file rolling i didn't think of Logcompaction but
that indeed is a place where  can hit the fan pretty easy. especially
if you don't have many updates in there and you pass the timestamp along
in
a kafka-streams application. Bootstrapping a new application then indeed
could produce quite a few old messages kicking this logrolling of until a
recent message appears. I guess that makes it a practical issue again
even
with the 7 days. Thanks for pointing out! Id like to see the appendTime
as
default, I am very happy that I have it in the backpocket for purpose of
tighter sleep and not to worry to much about someone accidentally doing
something dodgy on a weekend with our clusters

Regarding the usefulness, you will not be able to sell it for me. I don't
know how people build applications with this ¯\_(ツ)_/¯ but I don't want
to
see them.
Look at the error recovery with timestamp seek:
For fixing a bug, a user needs to stop the SP, truncate all his
downstream
data perfectly based on their time window.Then restart and do the first
fetch based
again on the perfect window timeout. From then 

Kafka Streams windows strange behavior

2016-09-08 Thread Adrienne Kole
Hi,

I am trying to implement simple scenario on streams library of kafka.

I insert data to kafka topic 1 tuple/second.
Streams library is connected to particular topic and what it does is:
1. construct 8 second windows with 4 second sliding time,
2. sum values of tuples (price field) and set count field
(possibly to find average)
3. output the results

So one would expect that I will have more or less aggregation results once
per 4 seconds.
However, I get the 2 output per second.

After analyzing a bit, I can conclude that for each tuple streams library
computes aggregation result  with each sub-window aggregator (as each tuple
can be part of several windows), and directly sends to downstream. It keeps
aggregated object in sub-windows so when new object comes, it is updated,
to make it non-blocking.

However, I see that the result of window is not updated w.r.t. previous
values. That is, if I set a count field for aggregated object, I see
count=1 for all of them.

Here is my code:



public static void main(String[] args) throws Exception {

if (args == null || args.length != 1){
throw new Exception("commandline argument is needed: path
to configuration file");
}
String confPath = args[0];
YamlReader reader = new YamlReader(new FileReader(confPath));
Object object = reader.read();
Map conf = (Map)object;

ArrayList bootstrapServersArr = (ArrayList)
conf.get("kafka.brokers");
Integer kafkaPort = new Integer
(conf.get("kafka.port").toString());
String bootstapServers  =
conf.get("bootstrap.servers").toString();
String bootstapServers  = conf.get("zoo.servers").toString();

String kafkaTopic = conf.get("kafka.topic").toString();
String dataGeneratorHost =
InetAddress.getLocalHost().getHostName();
Integer dataGeneratorPort = new
Integer(conf.get("datasourcesocket.port").toString());


Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"kafka-benchmark");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstapServers);
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
zookeeperServers);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.Long().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


int slideWindowLength = new
Integer(conf.get("slidingwindow.length").toString());// 8000 ms
int slideWindowSlide = new
Integer(conf.get("slidingwindow.slide").toString());// 4000 ms

KStreamBuilder builder = new KStreamBuilder();
KStream source =
builder.stream(Serdes.Long(),Serdes.String(), kafkaTopic);

KStream tsAppended = source.map(new
KeyValueMapper>() {
@Override
public KeyValue apply(Long aLong, String s) {
JSONObject obj = new JSONObject(s);
Double val =  obj.getJSONObject("m").getDouble("price");
Long id = obj.getJSONObject("s").getLong("aid1");

return new KeyValue(id val);
}
} );

KTable, AvgValue> windowAgg =
tsAppended.aggregateByKey(
new Initializer() {
@Override
public AvgValue apply() {
return new AvgValue(0, 0D, 0L);
}
},
new AvgAggregator(),
TimeWindows.of("timewindow",
slideWindowLength).advanceBy(slideWindowSlide),
Serdes.Long(), new AvgSerde()
);

KTable, AvgValue> windowAggResult  =
windowAgg.mapValues((v)-> {
   * System.out.println(v.getSum() + " - " + v.getCount() );;
return  v;*}   )   ;


KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}


public static class AvgAggregator  implements Aggregator {
@Override
public AvgValue apply(Long id, Double price, AvgValue avg) {
avg.setSum(avg.getSum() + price);
avg.setCount(avg.getCount() + 1);
return avg;
}
}

}



I output each tuple where it was created and check with the outputted value
in kafka-streams.

price - count

29.78169//  this is generator
29.78169 - 1   // this is kafka-streams
29.78169 - 1// this is kafka-streams
 30.767307//  this is generator
30.767307 - 1   // this is kafka-streams
30.767307 - 1   // this is kafka-streams
 8.034571//  this is generator
8.034571 - 1   // this is kafka-streams
8.034571 - 1   // this is kafka-streams
 79.36154//  this is generat

org.apache.kafka.common.errors.TimeoutException: Batch Expired

2016-09-08 Thread Sharath Raghavan
Hello everyone,

We have seen this exception couple of times -
"org.apache.kafka.common.errors.TimeoutException: Batch Expired".

It doesn't happen all the time but mostly during hiccups in network. There
is an open jira KAFKA-3686
.

I’m planning to try with higher values for request.timeout.ms (up to 15000
or so) in few days.


*Question:*

Did anyone face this issue? If so, were you able to tweak the producer to
minimize it?

Suggestions are welcome.


*Other details:*

We use Kafka 0.9.0.1, async producer.

Producer properties:

batch.size=921600

linger.ms=0

request.timeout.ms=5000

acks=1

retries=5

compression.type=none

and mostly default values for other producer properties.


*Thanks,*

*Sharath*

-- 
Best Regards,
Sharath