RE: Kafka Producer - Multiple broker - Data sent to buffer but not in Queue

2017-04-18 Thread Ranjith Anbazhakan
Unfortunately, there is no specific information (error/exception) in the logs 
when the buffer to queue records data goes missing i.e) When stopped broker 
(say broker 2) is started and followed by stopping current running broker that 
received all producer sent records in buffer (say broker 1).

Thanks,
Ranjith

-Original Message-
From: David Garcia [mailto:]
Sent: Wednesday, April 19, 2017 09:31
To: users@kafka.apache.org
Subject: Re: Kafka Producer - Multiple broker - Data sent to buffer but not in 
Queue

What do broker logs say around the time you send your messages?

On 4/18/17, 3:21 AM, "Ranjith Anbazhakan"  
wrote:

Hi,

I have been testing behavior of multiple broker instances of kafka in same 
machine and facing inconsistent behavior of producer sent records to buffer not 
being available in queue always.

Tried kafka versions:
0.10.2.0
0.10.1.0

Scenario:

1.   Ran two broker instances in same machine. Say broker 1 as initial 
leader, broker 2 as initial follower.

2.   Stopped broker 1. Now broker 2 became leader.

3.   Now producer sends records for a given topic TEST through send() 
method, followed by flush(). Records have to go to Broker 2 logically. No 
error/exception is thrown by code. (So it is assumed data has been sent 
successfully to buffer)

4.   When using command to check the records count for TEST topic in 
Broker 2, the sent records are not added to existing records count for that 
topic in queue.

a.   Used command - kafka-run-class.bat kafka.tools.GetOffsetShell 
--broker-list localhost:9094 --topic TEST --time -1 (where TEST is the used 
topic)

NOTE: **Step 4 is not happening always and is inconsistent**. In the 
scenario when it does not work, if Broker 1 is made UP and then made DOWN, 
records are always been available in queue in Broker 2 post doing Step 3.

Configurations:
Overall Producer configurations: (most are default values)
acks = all
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers =  , 
buffer.memory = 33554432
client.id = producer-1
compression.type = none
connections.max.idle.ms = 54
interceptor.classes = null
key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
linger.ms = 1
max.block.ms = 6
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 6
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 3
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 3
retries = 0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 3
value.serializer = class 
org.apache.kafka.common.serialization.StringSerializer

Broker 1: (server.properties)
broker.id=1
port=9091
advertised.host.name=
auto.create.topics.enable=true
default.replication.factor=2
leader.imbalance.check.interval.seconds=20
topic.metadata.refresh.interval.ms=-1

Broker 2: (server1.properties)
broker.id=2
port=9094
advertised.host.name=
auto.create.topics.enable=true
default.replication.factor=2
leader.imbalance.che

Re: Kafka Producer - Multiple broker - Data sent to buffer but not in Queue

2017-04-18 Thread David Garcia
What do broker logs say around the time you send your messages?

On 4/18/17, 3:21 AM, "Ranjith Anbazhakan"  
wrote:

Hi,

I have been testing behavior of multiple broker instances of kafka in same 
machine and facing inconsistent behavior of producer sent records to buffer not 
being available in queue always.

Tried kafka versions:
0.10.2.0
0.10.1.0

Scenario:

1.   Ran two broker instances in same machine. Say broker 1 as initial 
leader, broker 2 as initial follower.

2.   Stopped broker 1. Now broker 2 became leader.

3.   Now producer sends records for a given topic TEST through send() 
method, followed by flush(). Records have to go to Broker 2 logically. No 
error/exception is thrown by code. (So it is assumed data has been sent 
successfully to buffer)

4.   When using command to check the records count for TEST topic in 
Broker 2, the sent records are not added to existing records count for that 
topic in queue.

a.   Used command - kafka-run-class.bat kafka.tools.GetOffsetShell 
--broker-list localhost:9094 --topic TEST --time -1 (where TEST is the used 
topic)

NOTE: **Step 4 is not happening always and is inconsistent**. In the 
scenario when it does not work, if Broker 1 is made UP and then made DOWN, 
records are always been available in queue in Broker 2 post doing Step 3.

Configurations:
Overall Producer configurations: (most are default values)
acks = all
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers =  , 
buffer.memory = 33554432
client.id = producer-1
compression.type = none
connections.max.idle.ms = 54
interceptor.classes = null
key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
linger.ms = 1
max.block.ms = 6
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 6
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 3
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 3
retries = 0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 3
value.serializer = class 
org.apache.kafka.common.serialization.StringSerializer

Broker 1: (server.properties)
broker.id=1
port=9091
advertised.host.name=
auto.create.topics.enable=true
default.replication.factor=2
leader.imbalance.check.interval.seconds=20
topic.metadata.refresh.interval.ms=-1

Broker 2: (server1.properties)
broker.id=2
port=9094
advertised.host.name=
auto.create.topics.enable=true
default.replication.factor=2
leader.imbalance.check.interval.seconds=20
topic.metadata.refresh.interval.ms=-1

Do let know if anyone have faced similar scenarios and why such an issue 
occurs? Let know if any more details are needed.

Thanks,
Ranjith
[Aspire Systems]

This e-mail message and any attachments are for the sole use of the 
intended recipient(s) and may contain proprietary, confidential, trade secret 
or privileged information. Any unauthorized review, use, disclosure or 

Re: offset commitment from another client

2017-04-18 Thread Ewen Cheslack-Postava
Consumers are responsible for committing offsets, not brokers. See
http://kafka.apache.org/documentation.html#design_consumerposition for more
of an explanation of how this is tracked. The brokers help coordinate
this/store the offsets, but it is the consumers that decide when to commit
offsets (indicating they have processed the data).

-Ewen

On Mon, Mar 27, 2017 at 2:56 AM, Vova Shelgunov  wrote:

> Hi,
>
> I have an application which consumes messages from Kafka, then it creates a
> Docker container via Mesos which processes incoming message (image), but I
> need to commit an offset only once message is processed inside a Docker
> container. So basically I need to commit offset from another broker (that
> is running in a container).
>
> Will it work?
>
> Thanks
>


Re: even if i pass key no change in partition

2017-04-18 Thread Ewen Cheslack-Postava
Do you have more than 1 partition? You may have an auto-created topic with
only 1 partition, in which case the partition of messages will *always* be
the same, regardless of key.

-Ewen

On Fri, Mar 24, 2017 at 5:52 AM, Laxmi Narayan  wrote:

> Hi,
>
> I am passing key in producer but still no change in partition.
>
>
> I can see in producer response key value but no change in partition.
>
> *This is how my props looks:*
>
> props.put("bootstrap.servers",  "localhost:9092");
> props.put("group.id",   "test");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms","1000");
> props.put("session.timeout.ms", "3");
> props.put("linger.ms",  "1");
> props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("partitioner.class",
> "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
>
>
>
> *This is my output : *
> ConsumerRecord(topic = test, partition = 0, offset = 449, CreateTime =
> 1490359767112, checksum = 1987036294, serialized key size = 3, serialized
> value size = 16, key = 172, value = hello world - 86)
>
>
>
>
>
> *Regards,*
> *Laxmi Narayan Patel*
> *MCA NIT Durgapur (2011-2014)*
> *Mob:-9741292048,8345847473*
>


Re: MirrorMaker as a Background process

2017-04-18 Thread Greenhorn Techie
Fantastic. Thanks Manikumar.

On Tue, 18 Apr 2017 at 14:39 Manikumar  wrote:

> you can run as a background process by passing "-daemon" option to
> kafka-run-class.sh script.
> This script uses  nohup to run as background process.
>
> ex: kafka-run-class.sh -daemon kafka.tools.MirrorMaker  ... ...
>
> On Tue, Apr 18, 2017 at 6:27 PM, Greenhorn Techie <
> greenhorntec...@gmail.com
> > wrote:
>
> > Hi,
> >
> > I am still novice to Kafka and hence this rudimentary question.
> >
> > When I run MirrorMaker process from a bash cli, the cli prompt doesn't
> > return once I run the MirrorMaker command. Does that mean, I should only
> > run MirrorMaker as a background process if I need to have it running all
> > the time? We are backing up  Kafka data from one cluster to another and
> > using MirrorMaker for the same. Hence this is needed to be running all
> the
> > time once initiated.
> >
> > I am running it by passing nohup signal as well to the MirrorMaker
> command.
> >
> > Thanks
> >
>


Kafka streaming from RDBMS to RDBMS

2017-04-18 Thread Venkata B Nagothi
Hi Kafka Community,

I would like to know if KAFKA can stream out the data including large
objects (with images and videos - traditionally known as BLOBs) from one
RDBMS system to other with better performance ?

Is it something KAFKA is good at ?

I understand that KAFKA can do it and would like to know how better is it
compared to traditional ETLs.

I have used ETLs like Talend or similar to migrate Data from one RDBMS to
other, i am looking at an option which is more real-time and faster with
better performance. Any help would be appreciated.

Regards,

Venkata B N
Database Consultant


Re: ZK and Kafka failover testing

2017-04-18 Thread Hans Jespersen
When you publish, is acks=0,1 or all (-1)?
What is max.in.flight.requests.per.connection (default is 5)?

It sounds to me like your publishers are using acks=0 and so they are not
actually succeeding in publishing (i.e. you are getting no acks) but they
will retry over and over and will have up to 5 retries in flight, so when
the broker comes back up, you are getting 4 or 5 copies of the same
message.

Try setting max.in.flight.requests.per.connection=1 to get rid of duplicates
Try setting acks=all to ensure the messages are being persisted by the
leader and all the available replicas in the kafka cluster.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel  wrote:

> Hi All,
>
> I am seeing strange behavior between ZK and Kafka. We have 5 node in ZK
> and Kafka cluster each. Kafka version - 2.11-0.10.1.1
>
> The min.insync.replicas is 3, replication.factor is 5 for all topics,
> unclean.leader.election.enable is false. We have 15 partitions for each
> topic.
>
> The step we are following in our testing.
>
>
> * My understanding is that ZK needs aleast 3 out of 5 server to be
> functional. Kafka could not be functional without zookeeper. In out
> testing, we bring down 3 ZK nodes and don't touch Kafka nodes. Kafka is
> still functional, consumer\producer can still consume\publish from Kafka
> cluster. We then bring down all ZK nodes, Kafka consumer\producers are
> still functional. I am not able to understand why Kafka cluster is not
> failing as soon as majority of ZK nodes are down. I do see error in Kafka
> that it cannot connection to ZK cluster.
>
>
>
> * With all or majority of ZK node down, we bring down 1 Kafka
> nodes (out of 5, so 4 are running). And at that point the consumer and
> producer start failing. My guess is the new leadership election cannot
> happen without ZK.
>
>
>
> * Then we bring up the majority of ZK node up. (1st Kafka is still
> down) Now the Kafka cluster become functional, consumer and producer now
> start working again. But Consumer sees big junk of message from kafka, and
> many of them are duplicates. It's like these messages were held up
> somewhere, Where\Why I don't know?  And why the duplicates? I can
> understand few duplicates for messages that consumer would not commit
> before 1st node when down. But why so many duplicates and like 4 copy for
> each message. I cannot understand this behavior.
>
> Appreciate some insight about our issues. Also if there are blogs that
> describe the ZK and Kafka failover scenario behaviors, that would be
> extremely helpful.
>
> Thanks,
> Shri
>
> This e-mail and its contents (to include attachments) are the property of
> National Health Systems, Inc., its subsidiaries and affiliates, including
> but not limited to Rx.com Community Healthcare Network, Inc. and its
> subsidiaries, and may contain confidential and proprietary or privileged
> information. If you are not the intended recipient of this e-mail, you are
> hereby notified that any unauthorized disclosure, copying, or distribution
> of this e-mail or of its attachments, or the taking of any unauthorized
> action based on information contained herein is strictly prohibited.
> Unauthorized use of information contained herein may subject you to civil
> and criminal prosecution and penalties. If you are not the intended
> recipient, please immediately notify the sender by telephone at
> 800-433-5719 or return e-mail and permanently delete the original e-mail.
>


ZK and Kafka failover testing

2017-04-18 Thread Shrikant Patel
Hi All,

I am seeing strange behavior between ZK and Kafka. We have 5 node in ZK and 
Kafka cluster each. Kafka version - 2.11-0.10.1.1

The min.insync.replicas is 3, replication.factor is 5 for all topics, 
unclean.leader.election.enable is false. We have 15 partitions for each topic.

The step we are following in our testing.


* My understanding is that ZK needs aleast 3 out of 5 server to be 
functional. Kafka could not be functional without zookeeper. In out testing, we 
bring down 3 ZK nodes and don't touch Kafka nodes. Kafka is still functional, 
consumer\producer can still consume\publish from Kafka cluster. We then bring 
down all ZK nodes, Kafka consumer\producers are still functional. I am not able 
to understand why Kafka cluster is not failing as soon as majority of ZK nodes 
are down. I do see error in Kafka that it cannot connection to ZK cluster.



* With all or majority of ZK node down, we bring down 1 Kafka nodes 
(out of 5, so 4 are running). And at that point the consumer and producer start 
failing. My guess is the new leadership election cannot happen without ZK.



* Then we bring up the majority of ZK node up. (1st Kafka is still 
down) Now the Kafka cluster become functional, consumer and producer now start 
working again. But Consumer sees big junk of message from kafka, and many of 
them are duplicates. It's like these messages were held up somewhere, Where\Why 
I don't know?  And why the duplicates? I can understand few duplicates for 
messages that consumer would not commit before 1st node when down. But why so 
many duplicates and like 4 copy for each message. I cannot understand this 
behavior.

Appreciate some insight about our issues. Also if there are blogs that describe 
the ZK and Kafka failover scenario behaviors, that would be extremely helpful.

Thanks,
Shri

This e-mail and its contents (to include attachments) are the property of 
National Health Systems, Inc., its subsidiaries and affiliates, including but 
not limited to Rx.com Community Healthcare Network, Inc. and its subsidiaries, 
and may contain confidential and proprietary or privileged information. If you 
are not the intended recipient of this e-mail, you are hereby notified that any 
unauthorized disclosure, copying, or distribution of this e-mail or of its 
attachments, or the taking of any unauthorized action based on information 
contained herein is strictly prohibited. Unauthorized use of information 
contained herein may subject you to civil and criminal prosecution and 
penalties. If you are not the intended recipient, please immediately notify the 
sender by telephone at 800-433-5719 or return e-mail and permanently delete the 
original e-mail.


Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread David Garcia
The “NewShinyProducer” is also deprecated.

On 4/18/17, 5:41 PM, "David Garcia"  wrote:

The console producer in the 0.10.0.0 release uses the old producer which 
doesn’t have “backoff”…it’s really just for testing simple producing:

object ConsoleProducer {

  def main(args: Array[String]) {

try {
val config = new ProducerConfig(args)
val reader = 
Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
reader.init(System.in, getReaderProps(config))

val producer =
  if(config.useOldProducer) {
new OldProducer(getOldProducerProps(config))
  } else {
new NewShinyProducer(getNewProducerProps(config))
  }



On 4/18/17, 5:31 PM, "Robert Quinlivan"  wrote:

I am curious how your producer is configured. The producer maintains an
internal buffer of messages to be sent over to the broker. Is it 
possible
you are terminating the producer code in your test before the buffer is
exhausted?

On Tue, Apr 18, 2017 at 5:29 PM, jan  wrote:

> Thanks to both of you. Some quick points:
>
> I'd expect there to be backpressure from the producer if the broker is
> busy ie. the broker would not respond to the console producer if the
> broker was too busy accept more messages, and the producer would hang
> on the socket. Alternatively I'd hope the console producer would have
> the sense to back off and retry but clearly(?) not.
> This behaviour is actually relevant to my old job so I need to know 
more.
>
> Perhaps the timeout mentioned in the error msg can just be upped?
>
> *Is* the claimed timeout relevant?
> > Batch containing 8 record(s) expired due to timeout while requesting
> metadata from brokers for big_ptns1_repl1_nozip-0
>
> Why is the producer expiring records?
>
> But I'm surprised this happened because my setup is one machine with
> everything running on it. No network. Also Kafka writes to the disk
> without an fsync (or its equivalent on windows) which means it just
> gets cached in ram before being lazily written to disk, and I've got
> plenty of ram - 16GB ram vs 5GB of input file. Kafka adds its overhead
> so it grows to ~8GB but still, it need not hit disk at all (and the
> file goes into the windows memory, not java's).
> Maybe it is GC holding things up but I dunno, GC even for a second or
> two should not cause a socket failure, just delay the read, though I'm
> not an expert on this *at all*.
>
> I'll go over the answers tomorrow more carefully but thanks anyway!
>
> cheers
>
> jan
>
> On 18/04/2017, Serega Sheypak  wrote:
> >> err, isn't it supposed to? Isn't the loss of data a very serious 
error?
> > Kafka can't fix networking issues like latencies, blinking,
> unavailability
> > or any other weird stuff. Kafka promises you to persist data if data
> > reaches Kafka. Data delivery responsibility to kafka is on your 
side. You
> > fail to do it according to logs.
> >
> > 0.02% not 2%
> > You should check broker logs to figure out what went wrong. All 
things
> > happen on one machine as far as I understand. Maybe your brokers 
don't
> have
> > enough mem and they stuck because of GC and don't respond to 
producer.
> > Async producer fails to send data. That is why you observe data 
loss on
> > consumer side.
> >
> >
> > 2017-04-18 23:32 GMT+02:00 jan :
> >
> >> Hi Serega,
> >>
> >> > data didn't reach producer. So why should data appear in 
consumer?
> >>
> >> err, isn't it supposed to? Isn't the loss of data a very serious 
error?
> >>
> >> > loss rate is more or less similar [...] Not so bad.
> >>
> >> That made me laugh at least.  Is kafka intended to be a reliable
> >> message delivery system, or is a 2% data loss officially 
acceptable?
> >>
> >> I've been reading the other threads and one says windows is really 
not
> >> supported, and certainly not for production. Perhaps that's the 
root
> >> of it. Well I'm hoping to try it on linux shortly so I'll see if I 
can
> >> replicate the issue but I would like to know whether it *should* 
work
> >> in windows.
> >>
> >> cheers
> >>
> >> jan
> >>
> >> On 18/04/2017, Serega Sheypak  wrote:
> >> > Hi,
> >> >
> >> > [2017-04-17 18:14:05,868] ERROR Error when sending message to 
topic
> >> >

Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread David Garcia
The console producer in the 0.10.0.0 release uses the old producer which 
doesn’t have “backoff”…it’s really just for testing simple producing:

object ConsoleProducer {

  def main(args: Array[String]) {

try {
val config = new ProducerConfig(args)
val reader = 
Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
reader.init(System.in, getReaderProps(config))

val producer =
  if(config.useOldProducer) {
new OldProducer(getOldProducerProps(config))
  } else {
new NewShinyProducer(getNewProducerProps(config))
  }



On 4/18/17, 5:31 PM, "Robert Quinlivan"  wrote:

I am curious how your producer is configured. The producer maintains an
internal buffer of messages to be sent over to the broker. Is it possible
you are terminating the producer code in your test before the buffer is
exhausted?

On Tue, Apr 18, 2017 at 5:29 PM, jan  wrote:

> Thanks to both of you. Some quick points:
>
> I'd expect there to be backpressure from the producer if the broker is
> busy ie. the broker would not respond to the console producer if the
> broker was too busy accept more messages, and the producer would hang
> on the socket. Alternatively I'd hope the console producer would have
> the sense to back off and retry but clearly(?) not.
> This behaviour is actually relevant to my old job so I need to know more.
>
> Perhaps the timeout mentioned in the error msg can just be upped?
>
> *Is* the claimed timeout relevant?
> > Batch containing 8 record(s) expired due to timeout while requesting
> metadata from brokers for big_ptns1_repl1_nozip-0
>
> Why is the producer expiring records?
>
> But I'm surprised this happened because my setup is one machine with
> everything running on it. No network. Also Kafka writes to the disk
> without an fsync (or its equivalent on windows) which means it just
> gets cached in ram before being lazily written to disk, and I've got
> plenty of ram - 16GB ram vs 5GB of input file. Kafka adds its overhead
> so it grows to ~8GB but still, it need not hit disk at all (and the
> file goes into the windows memory, not java's).
> Maybe it is GC holding things up but I dunno, GC even for a second or
> two should not cause a socket failure, just delay the read, though I'm
> not an expert on this *at all*.
>
> I'll go over the answers tomorrow more carefully but thanks anyway!
>
> cheers
>
> jan
>
> On 18/04/2017, Serega Sheypak  wrote:
> >> err, isn't it supposed to? Isn't the loss of data a very serious error?
> > Kafka can't fix networking issues like latencies, blinking,
> unavailability
> > or any other weird stuff. Kafka promises you to persist data if data
> > reaches Kafka. Data delivery responsibility to kafka is on your side. 
You
> > fail to do it according to logs.
> >
> > 0.02% not 2%
> > You should check broker logs to figure out what went wrong. All things
> > happen on one machine as far as I understand. Maybe your brokers don't
> have
> > enough mem and they stuck because of GC and don't respond to producer.
> > Async producer fails to send data. That is why you observe data loss on
> > consumer side.
> >
> >
> > 2017-04-18 23:32 GMT+02:00 jan :
> >
> >> Hi Serega,
> >>
> >> > data didn't reach producer. So why should data appear in consumer?
> >>
> >> err, isn't it supposed to? Isn't the loss of data a very serious error?
> >>
> >> > loss rate is more or less similar [...] Not so bad.
> >>
> >> That made me laugh at least.  Is kafka intended to be a reliable
> >> message delivery system, or is a 2% data loss officially acceptable?
> >>
> >> I've been reading the other threads and one says windows is really not
> >> supported, and certainly not for production. Perhaps that's the root
> >> of it. Well I'm hoping to try it on linux shortly so I'll see if I can
> >> replicate the issue but I would like to know whether it *should* work
> >> in windows.
> >>
> >> cheers
> >>
> >> jan
> >>
> >> On 18/04/2017, Serega Sheypak  wrote:
> >> > Hi,
> >> >
> >> > [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> >> > big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
> >> > (org.apache.kafka.clients.
> >> > producer.internals.ErrorLoggingCallback)
> >> > org.apache.kafka.common.errors.TimeoutException: Batch containing 8
> >> > record(s) expired due to timeout while requesting metadata from
> >> > brokers for big_ptns1_repl1_nozip-0
> >> >
> >> > data didn't reach producer. So why should data appear in consumer?
> >> > loss rate is more or less similar : 0.02 (130k / 5400mb) ~ 0.03%
> (150mb

Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread Robert Quinlivan
I am curious how your producer is configured. The producer maintains an
internal buffer of messages to be sent over to the broker. Is it possible
you are terminating the producer code in your test before the buffer is
exhausted?

On Tue, Apr 18, 2017 at 5:29 PM, jan  wrote:

> Thanks to both of you. Some quick points:
>
> I'd expect there to be backpressure from the producer if the broker is
> busy ie. the broker would not respond to the console producer if the
> broker was too busy accept more messages, and the producer would hang
> on the socket. Alternatively I'd hope the console producer would have
> the sense to back off and retry but clearly(?) not.
> This behaviour is actually relevant to my old job so I need to know more.
>
> Perhaps the timeout mentioned in the error msg can just be upped?
>
> *Is* the claimed timeout relevant?
> > Batch containing 8 record(s) expired due to timeout while requesting
> metadata from brokers for big_ptns1_repl1_nozip-0
>
> Why is the producer expiring records?
>
> But I'm surprised this happened because my setup is one machine with
> everything running on it. No network. Also Kafka writes to the disk
> without an fsync (or its equivalent on windows) which means it just
> gets cached in ram before being lazily written to disk, and I've got
> plenty of ram - 16GB ram vs 5GB of input file. Kafka adds its overhead
> so it grows to ~8GB but still, it need not hit disk at all (and the
> file goes into the windows memory, not java's).
> Maybe it is GC holding things up but I dunno, GC even for a second or
> two should not cause a socket failure, just delay the read, though I'm
> not an expert on this *at all*.
>
> I'll go over the answers tomorrow more carefully but thanks anyway!
>
> cheers
>
> jan
>
> On 18/04/2017, Serega Sheypak  wrote:
> >> err, isn't it supposed to? Isn't the loss of data a very serious error?
> > Kafka can't fix networking issues like latencies, blinking,
> unavailability
> > or any other weird stuff. Kafka promises you to persist data if data
> > reaches Kafka. Data delivery responsibility to kafka is on your side. You
> > fail to do it according to logs.
> >
> > 0.02% not 2%
> > You should check broker logs to figure out what went wrong. All things
> > happen on one machine as far as I understand. Maybe your brokers don't
> have
> > enough mem and they stuck because of GC and don't respond to producer.
> > Async producer fails to send data. That is why you observe data loss on
> > consumer side.
> >
> >
> > 2017-04-18 23:32 GMT+02:00 jan :
> >
> >> Hi Serega,
> >>
> >> > data didn't reach producer. So why should data appear in consumer?
> >>
> >> err, isn't it supposed to? Isn't the loss of data a very serious error?
> >>
> >> > loss rate is more or less similar [...] Not so bad.
> >>
> >> That made me laugh at least.  Is kafka intended to be a reliable
> >> message delivery system, or is a 2% data loss officially acceptable?
> >>
> >> I've been reading the other threads and one says windows is really not
> >> supported, and certainly not for production. Perhaps that's the root
> >> of it. Well I'm hoping to try it on linux shortly so I'll see if I can
> >> replicate the issue but I would like to know whether it *should* work
> >> in windows.
> >>
> >> cheers
> >>
> >> jan
> >>
> >> On 18/04/2017, Serega Sheypak  wrote:
> >> > Hi,
> >> >
> >> > [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> >> > big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
> >> > (org.apache.kafka.clients.
> >> > producer.internals.ErrorLoggingCallback)
> >> > org.apache.kafka.common.errors.TimeoutException: Batch containing 8
> >> > record(s) expired due to timeout while requesting metadata from
> >> > brokers for big_ptns1_repl1_nozip-0
> >> >
> >> > data didn't reach producer. So why should data appear in consumer?
> >> > loss rate is more or less similar : 0.02 (130k / 5400mb) ~ 0.03%
> (150mb
> >> > /
> >> > 5000gb) Not so bad.
> >> >
> >> >
> >> > 2017-04-18 21:46 GMT+02:00 jan :
> >> >
> >> >> Hi all, I'm something of a kafka n00b.
> >> >> I posted the following in the  google newsgroup, haven't had a reply
> >> >> or even a single read so I'll try here. My original msg, slightly
> >> >> edited, was:
> >> >>
> >> >> 
> >> >>
> >> >> (windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
> >> >> server, latest version of java)
> >> >>
> >> >> I've spent several days trying to sort out unexpected behaviour
> >> >> involving kafka and the kafka console producer and consumer.
> >> >>
> >> >>  If I set  the console produced and console consumer to look at the
> >> >> same topic then I can type lines into the producer window and see
> them
> >> >> appear in the consumer window, so it works.
> >> >>
> >> >> If I try to pipe in large amounts of data to the producer, some gets
> >> >> lost and the producer reports errors eg.
> >> >>
> >> >> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> >> >> big_pt

Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread jan
Thanks to both of you. Some quick points:

I'd expect there to be backpressure from the producer if the broker is
busy ie. the broker would not respond to the console producer if the
broker was too busy accept more messages, and the producer would hang
on the socket. Alternatively I'd hope the console producer would have
the sense to back off and retry but clearly(?) not.
This behaviour is actually relevant to my old job so I need to know more.

Perhaps the timeout mentioned in the error msg can just be upped?

*Is* the claimed timeout relevant?
> Batch containing 8 record(s) expired due to timeout while requesting metadata 
> from brokers for big_ptns1_repl1_nozip-0

Why is the producer expiring records?

But I'm surprised this happened because my setup is one machine with
everything running on it. No network. Also Kafka writes to the disk
without an fsync (or its equivalent on windows) which means it just
gets cached in ram before being lazily written to disk, and I've got
plenty of ram - 16GB ram vs 5GB of input file. Kafka adds its overhead
so it grows to ~8GB but still, it need not hit disk at all (and the
file goes into the windows memory, not java's).
Maybe it is GC holding things up but I dunno, GC even for a second or
two should not cause a socket failure, just delay the read, though I'm
not an expert on this *at all*.

I'll go over the answers tomorrow more carefully but thanks anyway!

cheers

jan

On 18/04/2017, Serega Sheypak  wrote:
>> err, isn't it supposed to? Isn't the loss of data a very serious error?
> Kafka can't fix networking issues like latencies, blinking, unavailability
> or any other weird stuff. Kafka promises you to persist data if data
> reaches Kafka. Data delivery responsibility to kafka is on your side. You
> fail to do it according to logs.
>
> 0.02% not 2%
> You should check broker logs to figure out what went wrong. All things
> happen on one machine as far as I understand. Maybe your brokers don't have
> enough mem and they stuck because of GC and don't respond to producer.
> Async producer fails to send data. That is why you observe data loss on
> consumer side.
>
>
> 2017-04-18 23:32 GMT+02:00 jan :
>
>> Hi Serega,
>>
>> > data didn't reach producer. So why should data appear in consumer?
>>
>> err, isn't it supposed to? Isn't the loss of data a very serious error?
>>
>> > loss rate is more or less similar [...] Not so bad.
>>
>> That made me laugh at least.  Is kafka intended to be a reliable
>> message delivery system, or is a 2% data loss officially acceptable?
>>
>> I've been reading the other threads and one says windows is really not
>> supported, and certainly not for production. Perhaps that's the root
>> of it. Well I'm hoping to try it on linux shortly so I'll see if I can
>> replicate the issue but I would like to know whether it *should* work
>> in windows.
>>
>> cheers
>>
>> jan
>>
>> On 18/04/2017, Serega Sheypak  wrote:
>> > Hi,
>> >
>> > [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
>> > big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
>> > (org.apache.kafka.clients.
>> > producer.internals.ErrorLoggingCallback)
>> > org.apache.kafka.common.errors.TimeoutException: Batch containing 8
>> > record(s) expired due to timeout while requesting metadata from
>> > brokers for big_ptns1_repl1_nozip-0
>> >
>> > data didn't reach producer. So why should data appear in consumer?
>> > loss rate is more or less similar : 0.02 (130k / 5400mb) ~ 0.03% (150mb
>> > /
>> > 5000gb) Not so bad.
>> >
>> >
>> > 2017-04-18 21:46 GMT+02:00 jan :
>> >
>> >> Hi all, I'm something of a kafka n00b.
>> >> I posted the following in the  google newsgroup, haven't had a reply
>> >> or even a single read so I'll try here. My original msg, slightly
>> >> edited, was:
>> >>
>> >> 
>> >>
>> >> (windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
>> >> server, latest version of java)
>> >>
>> >> I've spent several days trying to sort out unexpected behaviour
>> >> involving kafka and the kafka console producer and consumer.
>> >>
>> >>  If I set  the console produced and console consumer to look at the
>> >> same topic then I can type lines into the producer window and see them
>> >> appear in the consumer window, so it works.
>> >>
>> >> If I try to pipe in large amounts of data to the producer, some gets
>> >> lost and the producer reports errors eg.
>> >>
>> >> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
>> >> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
>> >> (org.apache.kafka.clients.
>> >> producer.internals.ErrorLoggingCallback)
>> >> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
>> >> record(s) expired due to timeout while requesting metadata from
>> >> brokers for big_ptns1_repl1_nozip-0
>> >>
>> >> I'm using as input a file either shakespeare's full works (about 5.4
>> >> meg ascii), or a much larger file of shakespear's full works
>> >> replicated 900 tim

Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread Robert Quinlivan
Kafka can be tuned for greater delivery guarantees, but such guarantees
come at the cost of latency and throughput (as they do in many other such
systems). If you are doing a simple end-to-end test you may want to look at
tuning the "acks" configuration setting to ensure you aren't dropping any
messages during the test.

On Tue, Apr 18, 2017 at 5:02 PM, Serega Sheypak 
wrote:

> > err, isn't it supposed to? Isn't the loss of data a very serious error?
> Kafka can't fix networking issues like latencies, blinking, unavailability
> or any other weird stuff. Kafka promises you to persist data if data
> reaches Kafka. Data delivery responsibility to kafka is on your side. You
> fail to do it according to logs.
>
> 0.02% not 2%
> You should check broker logs to figure out what went wrong. All things
> happen on one machine as far as I understand. Maybe your brokers don't have
> enough mem and they stuck because of GC and don't respond to producer.
> Async producer fails to send data. That is why you observe data loss on
> consumer side.
>
>
> 2017-04-18 23:32 GMT+02:00 jan :
>
> > Hi Serega,
> >
> > > data didn't reach producer. So why should data appear in consumer?
> >
> > err, isn't it supposed to? Isn't the loss of data a very serious error?
> >
> > > loss rate is more or less similar [...] Not so bad.
> >
> > That made me laugh at least.  Is kafka intended to be a reliable
> > message delivery system, or is a 2% data loss officially acceptable?
> >
> > I've been reading the other threads and one says windows is really not
> > supported, and certainly not for production. Perhaps that's the root
> > of it. Well I'm hoping to try it on linux shortly so I'll see if I can
> > replicate the issue but I would like to know whether it *should* work
> > in windows.
> >
> > cheers
> >
> > jan
> >
> > On 18/04/2017, Serega Sheypak  wrote:
> > > Hi,
> > >
> > > [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> > > big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
> > > (org.apache.kafka.clients.
> > > producer.internals.ErrorLoggingCallback)
> > > org.apache.kafka.common.errors.TimeoutException: Batch containing 8
> > > record(s) expired due to timeout while requesting metadata from
> > > brokers for big_ptns1_repl1_nozip-0
> > >
> > > data didn't reach producer. So why should data appear in consumer?
> > > loss rate is more or less similar : 0.02 (130k / 5400mb) ~ 0.03%
> (150mb /
> > > 5000gb) Not so bad.
> > >
> > >
> > > 2017-04-18 21:46 GMT+02:00 jan :
> > >
> > >> Hi all, I'm something of a kafka n00b.
> > >> I posted the following in the  google newsgroup, haven't had a reply
> > >> or even a single read so I'll try here. My original msg, slightly
> > >> edited, was:
> > >>
> > >> 
> > >>
> > >> (windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
> > >> server, latest version of java)
> > >>
> > >> I've spent several days trying to sort out unexpected behaviour
> > >> involving kafka and the kafka console producer and consumer.
> > >>
> > >>  If I set  the console produced and console consumer to look at the
> > >> same topic then I can type lines into the producer window and see them
> > >> appear in the consumer window, so it works.
> > >>
> > >> If I try to pipe in large amounts of data to the producer, some gets
> > >> lost and the producer reports errors eg.
> > >>
> > >> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> > >> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
> > >> (org.apache.kafka.clients.
> > >> producer.internals.ErrorLoggingCallback)
> > >> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
> > >> record(s) expired due to timeout while requesting metadata from
> > >> brokers for big_ptns1_repl1_nozip-0
> > >>
> > >> I'm using as input a file either shakespeare's full works (about 5.4
> > >> meg ascii), or a much larger file of shakespear's full works
> > >> replicated 900 times to make it about 5GB. Lines are ascii and short,
> > >> and each line should be a single record when read in by the console
> > >> producer. I need to do some benchmarking on time and space and this
> > >> was my first try.
> > >>
> > >> As mentioned, data gets lost. I presume it is expected that any data
> > >> we pipe into the producer should arrive in the consumer, so if I do
> > >> this in one windows console:
> > >>
> > >> kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic
> > >> big_ptns1_repl1_nozip --zookeeper localhost:2181 >
> > >> F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt
> > >>
> > >> and this in another:
> > >>
> > >> kafka-console-producer.bat --broker-list localhost:9092  --topic
> > >> big_ptns1_repl1_nozip <
> > >> F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt
> > >>
> > >> then the output file "single_all_shakespear_OUT.txt" should be
> > >> identical to the input file "complete_works_no_bare_lines.txt" except
> > >> it's not. For t

Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread Serega Sheypak
> err, isn't it supposed to? Isn't the loss of data a very serious error?
Kafka can't fix networking issues like latencies, blinking, unavailability
or any other weird stuff. Kafka promises you to persist data if data
reaches Kafka. Data delivery responsibility to kafka is on your side. You
fail to do it according to logs.

0.02% not 2%
You should check broker logs to figure out what went wrong. All things
happen on one machine as far as I understand. Maybe your brokers don't have
enough mem and they stuck because of GC and don't respond to producer.
Async producer fails to send data. That is why you observe data loss on
consumer side.


2017-04-18 23:32 GMT+02:00 jan :

> Hi Serega,
>
> > data didn't reach producer. So why should data appear in consumer?
>
> err, isn't it supposed to? Isn't the loss of data a very serious error?
>
> > loss rate is more or less similar [...] Not so bad.
>
> That made me laugh at least.  Is kafka intended to be a reliable
> message delivery system, or is a 2% data loss officially acceptable?
>
> I've been reading the other threads and one says windows is really not
> supported, and certainly not for production. Perhaps that's the root
> of it. Well I'm hoping to try it on linux shortly so I'll see if I can
> replicate the issue but I would like to know whether it *should* work
> in windows.
>
> cheers
>
> jan
>
> On 18/04/2017, Serega Sheypak  wrote:
> > Hi,
> >
> > [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> > big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
> > (org.apache.kafka.clients.
> > producer.internals.ErrorLoggingCallback)
> > org.apache.kafka.common.errors.TimeoutException: Batch containing 8
> > record(s) expired due to timeout while requesting metadata from
> > brokers for big_ptns1_repl1_nozip-0
> >
> > data didn't reach producer. So why should data appear in consumer?
> > loss rate is more or less similar : 0.02 (130k / 5400mb) ~ 0.03% (150mb /
> > 5000gb) Not so bad.
> >
> >
> > 2017-04-18 21:46 GMT+02:00 jan :
> >
> >> Hi all, I'm something of a kafka n00b.
> >> I posted the following in the  google newsgroup, haven't had a reply
> >> or even a single read so I'll try here. My original msg, slightly
> >> edited, was:
> >>
> >> 
> >>
> >> (windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
> >> server, latest version of java)
> >>
> >> I've spent several days trying to sort out unexpected behaviour
> >> involving kafka and the kafka console producer and consumer.
> >>
> >>  If I set  the console produced and console consumer to look at the
> >> same topic then I can type lines into the producer window and see them
> >> appear in the consumer window, so it works.
> >>
> >> If I try to pipe in large amounts of data to the producer, some gets
> >> lost and the producer reports errors eg.
> >>
> >> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> >> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
> >> (org.apache.kafka.clients.
> >> producer.internals.ErrorLoggingCallback)
> >> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
> >> record(s) expired due to timeout while requesting metadata from
> >> brokers for big_ptns1_repl1_nozip-0
> >>
> >> I'm using as input a file either shakespeare's full works (about 5.4
> >> meg ascii), or a much larger file of shakespear's full works
> >> replicated 900 times to make it about 5GB. Lines are ascii and short,
> >> and each line should be a single record when read in by the console
> >> producer. I need to do some benchmarking on time and space and this
> >> was my first try.
> >>
> >> As mentioned, data gets lost. I presume it is expected that any data
> >> we pipe into the producer should arrive in the consumer, so if I do
> >> this in one windows console:
> >>
> >> kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic
> >> big_ptns1_repl1_nozip --zookeeper localhost:2181 >
> >> F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt
> >>
> >> and this in another:
> >>
> >> kafka-console-producer.bat --broker-list localhost:9092  --topic
> >> big_ptns1_repl1_nozip <
> >> F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt
> >>
> >> then the output file "single_all_shakespear_OUT.txt" should be
> >> identical to the input file "complete_works_no_bare_lines.txt" except
> >> it's not. For the complete works (sabout 5.4 meg uncompressed) I lost
> >> about 130K in the output.
> >> For the replicated shakespeare, which is about 5GB, I lost about 150
> meg.
> >>
> >> This can't be right surely and it's repeatable but happens at
> >> different places in the file when errors start to be produced, it
> >> seems.
> >>
> >> I've done this using all 3 versions of kafak in the 0.10.x.y branch
> >> and I get the same problem (the above commands were using the 0.10.0.0
> >> branch so they look a little obsolete but they are right for that
> >> branch I think). It's cost me some days.

Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread David Garcia
Kafka is very reliable when the broker actually gets the message and replies 
back to the producer that it got the message (i.e. it won’t “lie”).  Basically, 
your producer tried to put too many bananas into the Bananer’s basket.  And 
yes, Windows is not supported.  You will get much better performance with a 
linux deployment.  But as with everything, monitor Kafka (OS metrics such as 
disk scans..etc) so that you have a sense of how much “capacity” it has.  The 
confluent control center has some of this stuff out of the box, but you really 
should monitor OS metrics as well.  Netflix has a good article for this too: 
http://techblog.netflix.com/2016/04/kafka-inside-keystone-pipeline.html

My team built a simple latency canary app and reports the numbers to new 
relic…it’s very indicative of cluster health.

-David

On 4/18/17, 4:32 PM, "jan"  wrote:

Hi Serega,

> data didn't reach producer. So why should data appear in consumer?

err, isn't it supposed to? Isn't the loss of data a very serious error?

> loss rate is more or less similar [...] Not so bad.

That made me laugh at least.  Is kafka intended to be a reliable
message delivery system, or is a 2% data loss officially acceptable?

I've been reading the other threads and one says windows is really not
supported, and certainly not for production. Perhaps that's the root
of it. Well I'm hoping to try it on linux shortly so I'll see if I can
replicate the issue but I would like to know whether it *should* work
in windows.

cheers

jan

On 18/04/2017, Serega Sheypak  wrote:
> Hi,
>
> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
> (org.apache.kafka.clients.
> producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
> record(s) expired due to timeout while requesting metadata from
> brokers for big_ptns1_repl1_nozip-0
>
> data didn't reach producer. So why should data appear in consumer?
> loss rate is more or less similar : 0.02 (130k / 5400mb) ~ 0.03% (150mb /
> 5000gb) Not so bad.
>
>
> 2017-04-18 21:46 GMT+02:00 jan :
>
>> Hi all, I'm something of a kafka n00b.
>> I posted the following in the  google newsgroup, haven't had a reply
>> or even a single read so I'll try here. My original msg, slightly
>> edited, was:
>>
>> 
>>
>> (windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
>> server, latest version of java)
>>
>> I've spent several days trying to sort out unexpected behaviour
>> involving kafka and the kafka console producer and consumer.
>>
>>  If I set  the console produced and console consumer to look at the
>> same topic then I can type lines into the producer window and see them
>> appear in the consumer window, so it works.
>>
>> If I try to pipe in large amounts of data to the producer, some gets
>> lost and the producer reports errors eg.
>>
>> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
>> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
>> (org.apache.kafka.clients.
>> producer.internals.ErrorLoggingCallback)
>> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
>> record(s) expired due to timeout while requesting metadata from
>> brokers for big_ptns1_repl1_nozip-0
>>
>> I'm using as input a file either shakespeare's full works (about 5.4
>> meg ascii), or a much larger file of shakespear's full works
>> replicated 900 times to make it about 5GB. Lines are ascii and short,
>> and each line should be a single record when read in by the console
>> producer. I need to do some benchmarking on time and space and this
>> was my first try.
>>
>> As mentioned, data gets lost. I presume it is expected that any data
>> we pipe into the producer should arrive in the consumer, so if I do
>> this in one windows console:
>>
>> kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic
>> big_ptns1_repl1_nozip --zookeeper localhost:2181 >
>> F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt
>>
>> and this in another:
>>
>> kafka-console-producer.bat --broker-list localhost:9092  --topic
>> big_ptns1_repl1_nozip <
>> F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt
>>
>> then the output file "single_all_shakespear_OUT.txt" should be
>> identical to the input file "complete_works_no_bare_lines.txt" except
>> it's not. For the complete works (sabout 5.4 meg uncompressed) I lost
>> about 130K in the output.
>> For the replicated shakespeare, which is about 5GB, I lost about 150 meg.
>>
>> This can't be right surely and it

Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread jan
Hi Serega,

> data didn't reach producer. So why should data appear in consumer?

err, isn't it supposed to? Isn't the loss of data a very serious error?

> loss rate is more or less similar [...] Not so bad.

That made me laugh at least.  Is kafka intended to be a reliable
message delivery system, or is a 2% data loss officially acceptable?

I've been reading the other threads and one says windows is really not
supported, and certainly not for production. Perhaps that's the root
of it. Well I'm hoping to try it on linux shortly so I'll see if I can
replicate the issue but I would like to know whether it *should* work
in windows.

cheers

jan

On 18/04/2017, Serega Sheypak  wrote:
> Hi,
>
> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
> (org.apache.kafka.clients.
> producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
> record(s) expired due to timeout while requesting metadata from
> brokers for big_ptns1_repl1_nozip-0
>
> data didn't reach producer. So why should data appear in consumer?
> loss rate is more or less similar : 0.02 (130k / 5400mb) ~ 0.03% (150mb /
> 5000gb) Not so bad.
>
>
> 2017-04-18 21:46 GMT+02:00 jan :
>
>> Hi all, I'm something of a kafka n00b.
>> I posted the following in the  google newsgroup, haven't had a reply
>> or even a single read so I'll try here. My original msg, slightly
>> edited, was:
>>
>> 
>>
>> (windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
>> server, latest version of java)
>>
>> I've spent several days trying to sort out unexpected behaviour
>> involving kafka and the kafka console producer and consumer.
>>
>>  If I set  the console produced and console consumer to look at the
>> same topic then I can type lines into the producer window and see them
>> appear in the consumer window, so it works.
>>
>> If I try to pipe in large amounts of data to the producer, some gets
>> lost and the producer reports errors eg.
>>
>> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
>> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
>> (org.apache.kafka.clients.
>> producer.internals.ErrorLoggingCallback)
>> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
>> record(s) expired due to timeout while requesting metadata from
>> brokers for big_ptns1_repl1_nozip-0
>>
>> I'm using as input a file either shakespeare's full works (about 5.4
>> meg ascii), or a much larger file of shakespear's full works
>> replicated 900 times to make it about 5GB. Lines are ascii and short,
>> and each line should be a single record when read in by the console
>> producer. I need to do some benchmarking on time and space and this
>> was my first try.
>>
>> As mentioned, data gets lost. I presume it is expected that any data
>> we pipe into the producer should arrive in the consumer, so if I do
>> this in one windows console:
>>
>> kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic
>> big_ptns1_repl1_nozip --zookeeper localhost:2181 >
>> F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt
>>
>> and this in another:
>>
>> kafka-console-producer.bat --broker-list localhost:9092  --topic
>> big_ptns1_repl1_nozip <
>> F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt
>>
>> then the output file "single_all_shakespear_OUT.txt" should be
>> identical to the input file "complete_works_no_bare_lines.txt" except
>> it's not. For the complete works (sabout 5.4 meg uncompressed) I lost
>> about 130K in the output.
>> For the replicated shakespeare, which is about 5GB, I lost about 150 meg.
>>
>> This can't be right surely and it's repeatable but happens at
>> different places in the file when errors start to be produced, it
>> seems.
>>
>> I've done this using all 3 versions of kafak in the 0.10.x.y branch
>> and I get the same problem (the above commands were using the 0.10.0.0
>> branch so they look a little obsolete but they are right for that
>> branch I think). It's cost me some days.
>> So, am I making a mistake, if so what?
>>
>> thanks
>>
>> jan
>>
>


Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread Serega Sheypak
Hi,

[2017-04-17 18:14:05,868] ERROR Error when sending message to topic
big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
(org.apache.kafka.clients.
producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 8
record(s) expired due to timeout while requesting metadata from
brokers for big_ptns1_repl1_nozip-0

data didn't reach producer. So why should data appear in consumer?
loss rate is more or less similar : 0.02 (130k / 5400mb) ~ 0.03% (150mb /
5000gb) Not so bad.


2017-04-18 21:46 GMT+02:00 jan :

> Hi all, I'm something of a kafka n00b.
> I posted the following in the  google newsgroup, haven't had a reply
> or even a single read so I'll try here. My original msg, slightly
> edited, was:
>
> 
>
> (windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
> server, latest version of java)
>
> I've spent several days trying to sort out unexpected behaviour
> involving kafka and the kafka console producer and consumer.
>
>  If I set  the console produced and console consumer to look at the
> same topic then I can type lines into the producer window and see them
> appear in the consumer window, so it works.
>
> If I try to pipe in large amounts of data to the producer, some gets
> lost and the producer reports errors eg.
>
> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
> (org.apache.kafka.clients.
> producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
> record(s) expired due to timeout while requesting metadata from
> brokers for big_ptns1_repl1_nozip-0
>
> I'm using as input a file either shakespeare's full works (about 5.4
> meg ascii), or a much larger file of shakespear's full works
> replicated 900 times to make it about 5GB. Lines are ascii and short,
> and each line should be a single record when read in by the console
> producer. I need to do some benchmarking on time and space and this
> was my first try.
>
> As mentioned, data gets lost. I presume it is expected that any data
> we pipe into the producer should arrive in the consumer, so if I do
> this in one windows console:
>
> kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic
> big_ptns1_repl1_nozip --zookeeper localhost:2181 >
> F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt
>
> and this in another:
>
> kafka-console-producer.bat --broker-list localhost:9092  --topic
> big_ptns1_repl1_nozip <
> F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt
>
> then the output file "single_all_shakespear_OUT.txt" should be
> identical to the input file "complete_works_no_bare_lines.txt" except
> it's not. For the complete works (sabout 5.4 meg uncompressed) I lost
> about 130K in the output.
> For the replicated shakespeare, which is about 5GB, I lost about 150 meg.
>
> This can't be right surely and it's repeatable but happens at
> different places in the file when errors start to be produced, it
> seems.
>
> I've done this using all 3 versions of kafak in the 0.10.x.y branch
> and I get the same problem (the above commands were using the 0.10.0.0
> branch so they look a little obsolete but they are right for that
> branch I think). It's cost me some days.
> So, am I making a mistake, if so what?
>
> thanks
>
> jan
>


possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread jan
Hi all, I'm something of a kafka n00b.
I posted the following in the  google newsgroup, haven't had a reply
or even a single read so I'll try here. My original msg, slightly
edited, was:



(windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
server, latest version of java)

I've spent several days trying to sort out unexpected behaviour
involving kafka and the kafka console producer and consumer.

 If I set  the console produced and console consumer to look at the
same topic then I can type lines into the producer window and see them
appear in the consumer window, so it works.

If I try to pipe in large amounts of data to the producer, some gets
lost and the producer reports errors eg.

[2017-04-17 18:14:05,868] ERROR Error when sending message to topic
big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
(org.apache.kafka.clients.
producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 8
record(s) expired due to timeout while requesting metadata from
brokers for big_ptns1_repl1_nozip-0

I'm using as input a file either shakespeare's full works (about 5.4
meg ascii), or a much larger file of shakespear's full works
replicated 900 times to make it about 5GB. Lines are ascii and short,
and each line should be a single record when read in by the console
producer. I need to do some benchmarking on time and space and this
was my first try.

As mentioned, data gets lost. I presume it is expected that any data
we pipe into the producer should arrive in the consumer, so if I do
this in one windows console:

kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic
big_ptns1_repl1_nozip --zookeeper localhost:2181 >
F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt

and this in another:

kafka-console-producer.bat --broker-list localhost:9092  --topic
big_ptns1_repl1_nozip <
F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt

then the output file "single_all_shakespear_OUT.txt" should be
identical to the input file "complete_works_no_bare_lines.txt" except
it's not. For the complete works (sabout 5.4 meg uncompressed) I lost
about 130K in the output.
For the replicated shakespeare, which is about 5GB, I lost about 150 meg.

This can't be right surely and it's repeatable but happens at
different places in the file when errors start to be produced, it
seems.

I've done this using all 3 versions of kafak in the 0.10.x.y branch
and I get the same problem (the above commands were using the 0.10.0.0
branch so they look a little obsolete but they are right for that
branch I think). It's cost me some days.
So, am I making a mistake, if so what?

thanks

jan


Permission to use images

2017-04-18 Thread Dean Morin
Hi,

I'm doing a talk on writing ETL jobs using Kafka. Since many in the
audience won't be familiar with Kafka, can I use some of the images from
https://kafka.apache.org/intro (with a link in the slide) in my
presentation?

-- 
*Dean Morin* |
*Senior Data Engineer*

*dean.mo...@fundingcircle.com * | 415.395.6972
(c)

747 Front St, 4th Fl | San Francisco, CA 94111

*Our Mission: **T**o build a better financial world*


Unless specifically indicated, this e-mail is not an offer to sell or a
solicitation of any investment products or other financial product or
service, an official confirmation of any transaction, or an official
statement of Funding Circle USA.  This e-mail is meant only for the
intended recipient of this transmission, and contains trade secret and
strictly confidential information belonging to the sender. It is unlawful
for unauthorized individuals to review, use, copy, disclose, or disseminate
confidential information. If you have received this e-mail in error, please
notify the sender immediately by telephone at (415) 813-5245 or by return
email and promptly delete this message from your system.


Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

2017-04-18 Thread Mahendra Kariya
Yeah. Quite possible. Completely missed this possibility. What I simply did
was to download and add the kafka-streams jar as a dependency. I didn't
update the downstream dependencies. My bad!

On Tue, Apr 18, 2017 at 7:42 PM, Eno Thereska 
wrote:

> Hi Mahendra,
>
> I see the java.lang.NoSuchMethodError: org.apache.kafka.clients... error.
> Looks like some jars aren't in the classpath?
>
> Eno
>
> > On 18 Apr 2017, at 12:46, Mahendra Kariya 
> wrote:
> >
> > Hey Eno,
> >
> > I just pulled the latest jar from the link you shared and tried to run my
> > code. I am getting the following exception on new KafkaStreams(). The
> same
> > code is working fine with 0.10.2.0 jar.
> >
> >
> > Exception in thread "main" org.apache.kafka.common.KafkaException:
> Failed
> > to construct kafka consumer
> >at org.apache.kafka.clients.consumer.KafkaConsumer.(
> > KafkaConsumer.java:717)
> >at org.apache.kafka.clients.consumer.KafkaConsumer.(
> > KafkaConsumer.java:566)
> >at org.apache.kafka.streams.processor.internals.
> > DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.
> java:38)
> >at org.apache.kafka.streams.processor.internals.
> StreamThread.(
> > StreamThread.java:316)
> >at org.apache.kafka.streams.KafkaStreams.(
> > KafkaStreams.java:358)
> >at org.apache.kafka.streams.KafkaStreams.(
> > KafkaStreams.java:279)
> > Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.
> > Metadata.update(Lorg/apache/kafka/common/Cluster;Ljava/util/Set;J)V
> >at org.apache.kafka.streams.processor.internals.
> > StreamsKafkaClient.(StreamsKafkaClient.java:98)
> >at org.apache.kafka.streams.processor.internals.
> > StreamsKafkaClient.(StreamsKafkaClient.java:82)
> >at org.apache.kafka.streams.processor.internals.
> > StreamPartitionAssignor.configure(StreamPartitionAssignor.java:219)
> >at org.apache.kafka.common.config.AbstractConfig.
> > getConfiguredInstances(AbstractConfig.java:254)
> >at org.apache.kafka.common.config.AbstractConfig.
> > getConfiguredInstances(AbstractConfig.java:220)
> >at org.apache.kafka.clients.consumer.KafkaConsumer.(
> > KafkaConsumer.java:673)
> >... 6 more
> >
> >
> >
> > On Tue, Apr 18, 2017 at 5:47 AM, Mahendra Kariya <
> mahendra.kar...@go-jek.com
> >> wrote:
> >
> >> Thanks!
> >>
> >> On Tue, Apr 18, 2017, 12:26 AM Eno Thereska 
> >> wrote:
> >>
> >>> The RC candidate build is here: http://home.apache.org/~
> >>> gwenshap/kafka-0.10.2.1-rc1/  >>> gwenshap/kafka-0.10.2.1-rc1/>
> >>>
> >>> Eno
>  On 17 Apr 2017, at 17:20, Mahendra Kariya  >
> >>> wrote:
> 
>  Thanks!
> 
>  In the meantime, is the jar published somewhere on github or as a part
> >>> of
>  build pipeline?
> 
>  On Mon, Apr 17, 2017 at 9:18 PM, Eno Thereska  >
>  wrote:
> 
> > Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this
> >>> week.
> >
> > Eno
> >> On 17 Apr 2017, at 13:25, Mahendra Kariya <
> mahendra.kar...@go-jek.com
> 
> > wrote:
> >>
> >> Are the bug fix releases published to Maven central repo?
> >>
> >> On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska <
> eno.there...@gmail.com
> 
> >> wrote:
> >>
> >>> Hi Sachin,
> >>>
> >>> In the bug fix release for 0.10.2 (and in trunk) we have now set
> >>> max.poll.interval to infinite since from our experience with
> streams
> > this
> >>> should not be something that users set: https://github.com/apache/
> >>> kafka/pull/2770/files  >>> kafka/pull/2770/files
> >> .
> >>>
> >>> We're in the process of documenting that change. For now you can
> > increase
> >>> the request timeout without worrying about max.poll.interval
> >>> anymore. In
> >>> fact I'd suggest you also increase max.poll.interval as we've done
> it
> > above.
> >>>
> >>> Thanks
> >>> Eno
> >>>
>  On 1 Apr 2017, at 03:28, Sachin Mittal 
> wrote:
> 
>  Should this timeout be less than max poll interval value? if yes
> >>> than
>  generally speaking what should be the ratio between two or range
> for
> > this
>  timeout value .
> 
>  Thanks
>  Sachin
> 
>  On 1 Apr 2017 04:57, "Matthias J. Sax" 
> >>> wrote:
> 
>  Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> 
> 
>  -Matthias
> 
> 
>  On 3/31/17 11:32 AM, Sachin Mittal wrote:
> > Hi,
> > So I have added the config ProducerConfig.RETRIES_CONFIG,
>  Integer.MAX_VALUE
> > and the NotLeaderForPartitionException is gone.
> >
> > However we see a new exception especially under heavy load:
> > org.apache.kafka.streams.errors.StreamsException: task [0_1]
> > exception
> > caught when pro

[VOTE] 0.10.2.1 RC2

2017-04-18 Thread Gwen Shapira
Hello Kafka users, developers and client-developers,

This is the third candidate for release of Apache Kafka 0.10.2.1.

It is a bug fix release, so we have lots of bug fixes, some super
important.

Release notes for the 0.10.2.1 release:
http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc2/RELEASE_NOTES.html

*** Please download, test and vote by Friday, 8am PST. ***

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc2/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/

* Javadoc:
http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc2/javadoc/

* Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.1 tag:
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=dea3da5b31cc310974685a8bbccc34a2ec2ac5c8


* Documentation:
http://kafka.apache.org/0102/documentation.html

* Protocol:
http://kafka.apache.org/0102/protocol.html

/**

Your help in validating this bugfix release is super valuable, so
please take the time to test and vote!

Suggested tests:
 * Grab the source archive and make sure it compiles
 * Grab one of the binary distros and run the quickstarts against them
 * Extract and verify one of the site docs jars
 * Build a sample against jars in the staging repo
 * Validate GPG signatures on at least one file
 * Validate the javadocs look ok
 * The 0.10.2 documentation was updated for this bugfix release
(especially upgrade, streams and connect portions) - please make sure
it looks ok: http://kafka.apache.org/documentation.html

Thanks,

Gwen


Re: Kafka Streams - Join synchronization issue

2017-04-18 Thread Damian Guy
Hi Marco,

w.r.t your JoinWindows. I think you'll want to increase the until time as
this is roughly how long the record will be kept in the local state store
that is used for joins (based on event time). So if you have data arriving
on different streams at varying rates, event time of one of your streams is
likely advancing faster than the other stream. In your current code if one
of the streams is ahead of the other by more than 5 minutes, then there is
a good chance the data from the faster stream has been dropped from the
local store, so the join won't happen.

It is pretty hard to tell what is going on with the SessionWindows, but
each time a new event is processed both the initializer and aggregator will
run. Additionally the merger will run for any sessions that need merging.
So there should be the same number of calls to the initializer and
aggegator.  Might be worth taking a few thread dumps to try and work out
what it is doing.

Thanks,
Damian


On Tue, 18 Apr 2017 at 14:01 Marco Abitabile 
wrote:

> Hello Eno,
>
> yes it is 1-second width (I unintentionally wrote 1-minute, in my previous
> mail).
> Just to provide you more info about the data: location data have 1-second
> resolution, while user activity data arrives with varying speed, may happen
> to have 10-100 records as well as 0 records within 1 second.
>
> In the meantime I tried to start my reprocessing stage by syncing the two
> streams (location data and User data).
> Apparently it works and joins are performed continuously.
> What I'm experiencing now is that after the join I have a session window
> aggregation that is creating new sessions at very high rate. Right now the
> stream app has ingested 6k activity data records, however, more than 500k
> session windows have been created. Among this 500k sessions, around 100
> (one-hundred) have been aggregated with MySession::aggregateSessions
> function.
> also the cpu is 100% used.
> To the same stream app, at the beginning of March, Damian Guy found an
> issue (related with caching and session store) that he managed to fix right
> after. Right now i'm using the trunk version of kafka 0.10.2.0.
>
> the complete code is as follow:
>
>
> //properties
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "UserSessionWithLocation");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> MySessionSerde.class.getName());
> props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
> 8640);
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
> 100);
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
> 3);
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
> "earliest");
>
> //Other Stream: User Location, is a string with the name of the city 
> the//user is (like "San Francisco")
>
> KStreamBuilder builder = new KStreamBuilder();
> KStream userLocationStream = locationStreamBuilder
> .stream(stringSerde, stringSerde,"userLocationStreamData");
> KStream locationKstream = userLocationStream
> .map(MyStreamUtils::enhanceWithAreaDetails);
> locationKstream.to("user_location");
>
> //This Stream: User Activity
> KStream activity = builder.stream(stringSerde, jsonSerde, 
> "activityStreamData");
> activity.filter(MyStreamUtils::filterOutFakeUsers)
> .map(MyStreamUtils::enhanceWithScoreDetails)
> .join(
> locationKstream,
> MyStreamUtils::locationActivityJoiner,
> JoinWindows.of(1000).until(100
>
> 0 * 60 * 5),
> stringSerde, jsonSerde, stringSerde)
> .through("usersWithLocation")
>
> .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
> .groupByKey(stringSerde, jsonSerde)
> .aggregate(
> MySession::new,
> MySession::aggregateSessions,
> MySession::mergeSessions,
> SessionWindows
> .with(WINDOW_INACTIVITY_GAPS_MS) //5 minutes
> .until(WINDOW_MAINTAIN_DURATION_MS), // 7 minutes
> "aggregate_store")
> .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);
>
>
> KafkaStreams stream = new KafkaStreams(builder, propsActivity);
> stream.start();
>
>
> Do you see any issue here?Thanks a lot.
> Marco
>
>
> 2017-04-18 13:14 GMT+02:00 Eno Thereska :
>
>> Hi Marco,
>>
>> I noticed your window is 1 second width, not 1 minute width. Is that
>> intentional?
>>
>> Thanks
>>
> Eno
>>
>> On 17 Apr 2017, at 19:41, Marco Abitabile 
>> wrote:
>>
>> hello Eno,
>>
>> thanks for your support. The two streams are both kstreams. The window is of 
>> 1 minute-width until 5 minutes. This is the code:
>>
>>
>> //Other Stream: User Location, is a string with the name of the city 
>> the//user is (like "San Francisco")
>>
>> KStreamBuilder builder = new KStreamBuilder();
>> KStream use

Re: MirrorMaker as a Background process

2017-04-18 Thread Manikumar
you can run as a background process by passing "-daemon" option to
kafka-run-class.sh script.
This script uses  nohup to run as background process.

ex: kafka-run-class.sh -daemon kafka.tools.MirrorMaker  ... ...

On Tue, Apr 18, 2017 at 6:27 PM, Greenhorn Techie  wrote:

> Hi,
>
> I am still novice to Kafka and hence this rudimentary question.
>
> When I run MirrorMaker process from a bash cli, the cli prompt doesn't
> return once I run the MirrorMaker command. Does that mean, I should only
> run MirrorMaker as a background process if I need to have it running all
> the time? We are backing up  Kafka data from one cluster to another and
> using MirrorMaker for the same. Hence this is needed to be running all the
> time once initiated.
>
> I am running it by passing nohup signal as well to the MirrorMaker command.
>
> Thanks
>


Re: Kafka Streams - Join synchronization issue

2017-04-18 Thread Marco Abitabile
Hello Eno,

yes it is 1-second width (I unintentionally wrote 1-minute, in my previous
mail).
Just to provide you more info about the data: location data have 1-second
resolution, while user activity data arrives with varying speed, may happen
to have 10-100 records as well as 0 records within 1 second.

In the meantime I tried to start my reprocessing stage by syncing the two
streams (location data and User data).
Apparently it works and joins are performed continuously.
What I'm experiencing now is that after the join I have a session window
aggregation that is creating new sessions at very high rate. Right now the
stream app has ingested 6k activity data records, however, more than 500k
session windows have been created. Among this 500k sessions, around 100
(one-hundred) have been aggregated with MySession::aggregateSessions
function.
also the cpu is 100% used.
To the same stream app, at the beginning of March, Damian Guy found an
issue (related with caching and session store) that he managed to fix right
after. Right now i'm using the trunk version of kafka 0.10.2.0.

the complete code is as follow:


//properties
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "UserSessionWithLocation");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
MySessionSerde.class.getName());
props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
8640);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
100);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
3);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"earliest");

//Other Stream: User Location, is a string with the name of the city
the//user is (like "San Francisco")

KStreamBuilder builder = new KStreamBuilder();
KStream userLocationStream = locationStreamBuilder
.stream(stringSerde, stringSerde,"userLocationStreamData");
KStream locationKstream = userLocationStream
.map(MyStreamUtils::enhanceWithAreaDetails);
locationKstream.to("user_location");

//This Stream: User Activity
KStream activity = builder.stream(stringSerde,
jsonSerde, "activityStreamData");
activity.filter(MyStreamUtils::filterOutFakeUsers)
.map(MyStreamUtils::enhanceWithScoreDetails)
.join(
locationKstream,
MyStreamUtils::locationActivityJoiner,
JoinWindows.of(1000).until(1000 * 60 * 5),
stringSerde, jsonSerde, stringSerde)
.through("usersWithLocation")

.map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
.groupByKey(stringSerde, jsonSerde)
.aggregate(
MySession::new,
MySession::aggregateSessions,
MySession::mergeSessions,
SessionWindows
.with(WINDOW_INACTIVITY_GAPS_MS) //5 minutes
.until(WINDOW_MAINTAIN_DURATION_MS), // 7 minutes
"aggregate_store")
.to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);

KafkaStreams stream = new KafkaStreams(builder, propsActivity);
stream.start();


Do you see any issue here?Thanks a lot.
Marco


2017-04-18 13:14 GMT+02:00 Eno Thereska :

> Hi Marco,
>
> I noticed your window is 1 second width, not 1 minute width. Is that
> intentional?
>
> Thanks
> Eno
>
> On 17 Apr 2017, at 19:41, Marco Abitabile 
> wrote:
>
> hello Eno,
>
> thanks for your support. The two streams are both kstreams. The window is of 
> 1 minute-width until 5 minutes. This is the code:
>
>
> //Other Stream: User Location, is a string with the name of the city 
> the//user is (like "San Francisco")
>
> KStreamBuilder builder = new KStreamBuilder();
> KStream userLocationStream = locationStreamBuilder
> .stream(stringSerde, stringSerde,"userLocationStreamData");
> KStream locationKstream = userLocationStream
> 
> .map(MyStreamUtils::enhanceWithAreaDetails);locationKstream.to("user_location");
>
> //This Stream: User Activity
> KStream activity = builder.stream(stringSerde, jsonSerde, 
> "activityStreamData");
> activity.filter(MyStreamUtils::filterOutFakeUsers)
> .map(MyStreamUtils::enhanceWithScoreDetails)
> .join(
> locationKstream,
> MyStreamUtils::locationActivityJoiner,
> JoinWindows.of(1000).until(1000 * 60 * 5),
> stringSerde, jsonSerde, stringSerde)
> .to("usersWithLocation")
>
> KafkaStreams stream = new KafkaStreams(builder, propsActivity);
> stream.start();
>
>
> And MyStreamUtils::locationActivityJoiner does:
> public static JsonObject locationActivityJoiner(JsonObject activity, String
> loc) {
> JsonObject join = activity.copy();
> join.put("city" , loc);
> return join;
> }
>
>
> hum... your question is letting me think... are you telling me that since
> both are kstreams, they actually need to be re-streamed in sync?
>
> Thanks a lot.
>
> Marco
>
>
> 2017-04-16 21:45 GMT+02:00

MirrorMaker as a Background process

2017-04-18 Thread Greenhorn Techie
Hi,

I am still novice to Kafka and hence this rudimentary question.

When I run MirrorMaker process from a bash cli, the cli prompt doesn't
return once I run the MirrorMaker command. Does that mean, I should only
run MirrorMaker as a background process if I need to have it running all
the time? We are backing up  Kafka data from one cluster to another and
using MirrorMaker for the same. Hence this is needed to be running all the
time once initiated.

I am running it by passing nohup signal as well to the MirrorMaker command.

Thanks


Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

2017-04-18 Thread Eno Thereska
Hi Mahendra,

I see the java.lang.NoSuchMethodError: org.apache.kafka.clients... error. Looks 
like some jars aren't in the classpath?

Eno

> On 18 Apr 2017, at 12:46, Mahendra Kariya  wrote:
> 
> Hey Eno,
> 
> I just pulled the latest jar from the link you shared and tried to run my
> code. I am getting the following exception on new KafkaStreams(). The same
> code is working fine with 0.10.2.0 jar.
> 
> 
> Exception in thread "main" org.apache.kafka.common.KafkaException: Failed
> to construct kafka consumer
>at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:717)
>at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:566)
>at org.apache.kafka.streams.processor.internals.
> DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.java:38)
>at org.apache.kafka.streams.processor.internals.StreamThread.(
> StreamThread.java:316)
>at org.apache.kafka.streams.KafkaStreams.(
> KafkaStreams.java:358)
>at org.apache.kafka.streams.KafkaStreams.(
> KafkaStreams.java:279)
> Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.
> Metadata.update(Lorg/apache/kafka/common/Cluster;Ljava/util/Set;J)V
>at org.apache.kafka.streams.processor.internals.
> StreamsKafkaClient.(StreamsKafkaClient.java:98)
>at org.apache.kafka.streams.processor.internals.
> StreamsKafkaClient.(StreamsKafkaClient.java:82)
>at org.apache.kafka.streams.processor.internals.
> StreamPartitionAssignor.configure(StreamPartitionAssignor.java:219)
>at org.apache.kafka.common.config.AbstractConfig.
> getConfiguredInstances(AbstractConfig.java:254)
>at org.apache.kafka.common.config.AbstractConfig.
> getConfiguredInstances(AbstractConfig.java:220)
>at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:673)
>... 6 more
> 
> 
> 
> On Tue, Apr 18, 2017 at 5:47 AM, Mahendra Kariya > wrote:
> 
>> Thanks!
>> 
>> On Tue, Apr 18, 2017, 12:26 AM Eno Thereska 
>> wrote:
>> 
>>> The RC candidate build is here: http://home.apache.org/~
>>> gwenshap/kafka-0.10.2.1-rc1/ >> gwenshap/kafka-0.10.2.1-rc1/>
>>> 
>>> Eno
 On 17 Apr 2017, at 17:20, Mahendra Kariya 
>>> wrote:
 
 Thanks!
 
 In the meantime, is the jar published somewhere on github or as a part
>>> of
 build pipeline?
 
 On Mon, Apr 17, 2017 at 9:18 PM, Eno Thereska 
 wrote:
 
> Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this
>>> week.
> 
> Eno
>> On 17 Apr 2017, at 13:25, Mahendra Kariya >>> 
> wrote:
>> 
>> Are the bug fix releases published to Maven central repo?
>> 
>> On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska >>> 
>> wrote:
>> 
>>> Hi Sachin,
>>> 
>>> In the bug fix release for 0.10.2 (and in trunk) we have now set
>>> max.poll.interval to infinite since from our experience with streams
> this
>>> should not be something that users set: https://github.com/apache/
>>> kafka/pull/2770/files >> kafka/pull/2770/files
>> .
>>> 
>>> We're in the process of documenting that change. For now you can
> increase
>>> the request timeout without worrying about max.poll.interval
>>> anymore. In
>>> fact I'd suggest you also increase max.poll.interval as we've done it
> above.
>>> 
>>> Thanks
>>> Eno
>>> 
 On 1 Apr 2017, at 03:28, Sachin Mittal  wrote:
 
 Should this timeout be less than max poll interval value? if yes
>>> than
 generally speaking what should be the ratio between two or range for
> this
 timeout value .
 
 Thanks
 Sachin
 
 On 1 Apr 2017 04:57, "Matthias J. Sax" 
>>> wrote:
 
 Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
 
 
 -Matthias
 
 
 On 3/31/17 11:32 AM, Sachin Mittal wrote:
> Hi,
> So I have added the config ProducerConfig.RETRIES_CONFIG,
 Integer.MAX_VALUE
> and the NotLeaderForPartitionException is gone.
> 
> However we see a new exception especially under heavy load:
> org.apache.kafka.streams.errors.StreamsException: task [0_1]
> exception
> caught when producing
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
 checkForException(RecordCollectorImpl.java:119)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
> org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.flush(
 RecordCollectorImpl.java:127)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]at
> org.apache.kafka.streams.processor.internals.
>>> StreamTask$1.run(StreamTask.
 java:76)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
> org.ap

Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

2017-04-18 Thread Mahendra Kariya
Hey Eno,

I just pulled the latest jar from the link you shared and tried to run my
code. I am getting the following exception on new KafkaStreams(). The same
code is working fine with 0.10.2.0 jar.


Exception in thread "main" org.apache.kafka.common.KafkaException: Failed
to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.(
KafkaConsumer.java:717)
at org.apache.kafka.clients.consumer.KafkaConsumer.(
KafkaConsumer.java:566)
at org.apache.kafka.streams.processor.internals.
DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.java:38)
at org.apache.kafka.streams.processor.internals.StreamThread.(
StreamThread.java:316)
at org.apache.kafka.streams.KafkaStreams.(
KafkaStreams.java:358)
at org.apache.kafka.streams.KafkaStreams.(
KafkaStreams.java:279)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.
Metadata.update(Lorg/apache/kafka/common/Cluster;Ljava/util/Set;J)V
at org.apache.kafka.streams.processor.internals.
StreamsKafkaClient.(StreamsKafkaClient.java:98)
at org.apache.kafka.streams.processor.internals.
StreamsKafkaClient.(StreamsKafkaClient.java:82)
at org.apache.kafka.streams.processor.internals.
StreamPartitionAssignor.configure(StreamPartitionAssignor.java:219)
at org.apache.kafka.common.config.AbstractConfig.
getConfiguredInstances(AbstractConfig.java:254)
at org.apache.kafka.common.config.AbstractConfig.
getConfiguredInstances(AbstractConfig.java:220)
at org.apache.kafka.clients.consumer.KafkaConsumer.(
KafkaConsumer.java:673)
... 6 more



On Tue, Apr 18, 2017 at 5:47 AM, Mahendra Kariya  wrote:

> Thanks!
>
> On Tue, Apr 18, 2017, 12:26 AM Eno Thereska 
> wrote:
>
>> The RC candidate build is here: http://home.apache.org/~
>> gwenshap/kafka-0.10.2.1-rc1/ > gwenshap/kafka-0.10.2.1-rc1/>
>>
>> Eno
>> > On 17 Apr 2017, at 17:20, Mahendra Kariya 
>> wrote:
>> >
>> > Thanks!
>> >
>> > In the meantime, is the jar published somewhere on github or as a part
>> of
>> > build pipeline?
>> >
>> > On Mon, Apr 17, 2017 at 9:18 PM, Eno Thereska 
>> > wrote:
>> >
>> >> Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this
>> week.
>> >>
>> >> Eno
>> >>> On 17 Apr 2017, at 13:25, Mahendra Kariya > >
>> >> wrote:
>> >>>
>> >>> Are the bug fix releases published to Maven central repo?
>> >>>
>> >>> On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska > >
>> >>> wrote:
>> >>>
>>  Hi Sachin,
>> 
>>  In the bug fix release for 0.10.2 (and in trunk) we have now set
>>  max.poll.interval to infinite since from our experience with streams
>> >> this
>>  should not be something that users set: https://github.com/apache/
>>  kafka/pull/2770/files > kafka/pull/2770/files
>> >>> .
>> 
>>  We're in the process of documenting that change. For now you can
>> >> increase
>>  the request timeout without worrying about max.poll.interval
>> anymore. In
>>  fact I'd suggest you also increase max.poll.interval as we've done it
>> >> above.
>> 
>>  Thanks
>>  Eno
>> 
>> > On 1 Apr 2017, at 03:28, Sachin Mittal  wrote:
>> >
>> > Should this timeout be less than max poll interval value? if yes
>> than
>> > generally speaking what should be the ratio between two or range for
>> >> this
>> > timeout value .
>> >
>> > Thanks
>> > Sachin
>> >
>> > On 1 Apr 2017 04:57, "Matthias J. Sax" 
>> wrote:
>> >
>> > Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
>> >
>> >
>> > -Matthias
>> >
>> >
>> > On 3/31/17 11:32 AM, Sachin Mittal wrote:
>> >> Hi,
>> >> So I have added the config ProducerConfig.RETRIES_CONFIG,
>> > Integer.MAX_VALUE
>> >> and the NotLeaderForPartitionException is gone.
>> >>
>> >> However we see a new exception especially under heavy load:
>> >> org.apache.kafka.streams.errors.StreamsException: task [0_1]
>> >> exception
>> >> caught when producing
>> >> at
>> >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>> > checkForException(RecordCollectorImpl.java:119)
>> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >> at
>> >> org.apache.kafka.streams.processor.internals.
>> >> RecordCollectorImpl.flush(
>> > RecordCollectorImpl.java:127)
>> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]at
>> >> org.apache.kafka.streams.processor.internals.
>>  StreamTask$1.run(StreamTask.
>> > java:76)
>> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >> at
>> >> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> > measureLatencyNs(StreamsMetricsImpl.java:188)
>> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >> at
>> >> org.apache.kafka.streams.processor.internals.
>>  StreamTask.commit(StreamTask.
>> > java:280)
>> >> ~[kafka

Re: Kafka Streams - Join synchronization issue

2017-04-18 Thread Eno Thereska
Hi Marco,

I noticed your window is 1 second width, not 1 minute width. Is that 
intentional?

Thanks
Eno
> On 17 Apr 2017, at 19:41, Marco Abitabile  wrote:
> 
> hello Eno,
> thanks for your support. The two streams are both kstreams. The window is of 
> 1 minute-width until 5 minutes. This is the code:
> 
> //Other Stream: User Location, is a string with the name of the city the
> //user is (like "San Francisco")
> KStreamBuilder builder = new KStreamBuilder();
> KStream userLocationStream = locationStreamBuilder
> .stream(stringSerde, stringSerde,"userLocationStreamData");
> KStream locationKstream = userLocationStream
> .map(MyStreamUtils::enhanceWithAreaDetails);
> locationKstream.to("user_location");
> //This Stream: User Activity
> KStream activity = builder.stream(stringSerde, jsonSerde, 
> "activityStreamData");
> activity.filter(MyStreamUtils::filterOutFakeUsers)
> .map(MyStreamUtils::enhanceWithScoreDetails)
> .join(
> locationKstream,
> MyStreamUtils::locationActivityJoiner,
> JoinWindows.of(1000).until(1000 * 60 * 5),
> stringSerde, jsonSerde, stringSerde)
> .to("usersWithLocation")
> 
> KafkaStreams stream = new KafkaStreams(builder, propsActivity);
> stream.start();
> 
> 
> And MyStreamUtils::locationActivityJoiner does:
> 
> public static JsonObject locationActivityJoiner(JsonObject activity, String
> loc) {
> JsonObject join = activity.copy();
> join.put("city" , loc);
> return join;
> }
> 
> hum... your question is letting me think... are you telling me that since 
> both are kstreams, they actually need to be re-streamed in sync?
> 
> Thanks a lot.
> 
> Marco
> 
> 
> 2017-04-16 21:45 GMT+02:00 Eno Thereska  >:
> Hi Marco,
> 
> Could you share a bit of your code, or at a minimum provide some info on:
> - is userActivitiesStream and geoDataStream a KStream of KTable?
> - what is the length of "timewindow"?
> 
> Thanks
> Eno
> 
> > On 16 Apr 2017, at 19:44, Marco Abitabile  > > wrote:
> >
> > Hi All!
> >
> > I need a little hint to understand how join works, in regards of stream
> > synchronization.
> >
> > This mail is a bit long, I need to explain the issue I'm facing.
> >
> > *TL-TR: *
> > it seems that join synchonization between stream is not respected as
> > explained here:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization
> >  
> > 
> >
> > *The need:*
> > I have historical data residing into some databases, more specifically:
> >  - time series of user activities
> >  - time series of user geo positions
> >
> > *What I do:*
> > since I have a new algorithm I want to try, the historical data has been
> > already pruned by kafka retention policy and I have it into a database.
> > This is what I'm doing:
> >  1- spin up kafka-connect sink that takes historical gps data (let's say,
> > one day of data), ordered by event time, and push them into
> > "HistoricalGpsData" topic. This tasks pushes historical geo data as fast as
> > possible into kafka topic, respecting the original event time.
> >  2- spin up kafka-connect sink that takes historical user activities
> > (let's say, one day of data, the same day of gps data, of course), ordered
> > by event time, and push them into "HistoricalUserActivites" topic. This
> > tasks pushes historical user activities data as fast as possible into kafka
> > topic, respecting the original event time.
> >  3- spin up my new stream processor algorithm
> >
> > As per the nature of the data, I have the quantity of activity data much
> > higher than geo data, thus the task1 pushes all the needed geo data into
> > kafka topic within few minutes (around 10 minutes), while activities data,
> > since has a higher volume, is entirely pushed within 1 hour.
> > --> the two streams are pushed into kafka regardless of their
> > synchronization (however being aware of their nature, as explained above)
> >
> > *What I expect:*
> > Now, what I would expect is that when I perform the join between the two
> > stream:
> >
> >   userActivitiesStream.join(geoDataStrea, timewindow...)
> >
> > the join takes the incoming user activities data and joins with the geo
> > data respecting the given time window.
> > As per the nature of the data, there is always a match (within the given
> > timeWindow) between user activities data with geo data (in fact, when this
> > data arrives in real time, there are no issues at all)
> >
> > So. I expect that the join picks up from the topic the right geo data
> > (recall that geo data is pushed into the topic within 10 minutes) and joins
> > it with the user activities data (recall that user activities data is a
> > stream that takes around 1 hour)
> >
> > *What I get:*
> > What happens is that only 

Consuming from Kafka 8.2.2 from Actor. Can't reuse consumer instance

2017-04-18 Thread Serega Sheypak
Hi, I'm on 8.2.2. and can't use recent Consumer API described in "Kafka
definitive guide". My version of kafka-clients has stubs for consumer
methods.
This is why I'm using "old" high level consumer API and it doesn't work
well for me.
I have single actor that tries to read messages from single kafka topic
with single kafka partition

Here is my code:

def consume(numberOfEvents: Int, await: Duration = 100.millis):
List[MessageEnvelope] = {
val consumerProperties = new Properties()
consumerProperties.put("zookeeper.connect",
kafkaConfig.zooKeeperConnectString)
consumerProperties.put("group.id", consumerGroup)
consumerProperties.put("auto.offset.reset", "smallest")

val consumer = Consumer.create(new ConsumerConfig(consumerProperties))

try {
  val messageStreams = consumer.createMessageStreams(
Predef.Map(kafkaConfig.topic -> 1),
new DefaultDecoder,
new MessageEnvelopeDecoder)

  val receiveMessageFuture = Future[List[MessageEnvelope]] {
messageStreams(kafkaConfig.topic)
  .flatMap(stream => stream.take(numberOfEvents).map(_.message()))
  }

  Await.result(receiveMessageFuture, await)
} finally {
  consumer.shutdown()
}


It works fine, but I suppose I should reuse consumerConnector instance

val consumer = Consumer.create(new ConsumerConfig(consumerProperties))

I shouldn't create it for each "message poll"
I tried to have single instance of ConsumerConnector and messageStreams for
my singleton consumer actor.
It didn't go well. Exception is thrown

2017-04-17_20:02:44.236 WARN  MessageEnvelopeConsumer - Error while
consuming messages
kafka.common.MessageStreamsExistException: ZookeeperConsumerConnector
can create message streams at most once
at 
kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:151)
at MessageEnvelopeConsumer.consume(MessageEnvelopeConsumer.scala:47)

at this line:

messageStreams(kafkaConfig.topic)
  .flatMap(stream => stream.take(numberOfEvents).map(_.message()))

Then I tried to reuse only consumer and create messageStream each time I
poll messages.

Didn't go well, exception is:

2017-04-17_20:02:44.236 WARN  MessageEnvelopeConsumer - Error while
consuming messages
kafka.common.MessageStreamsExistException: ZookeeperConsumerConnector
can create message streams at most once
at 
kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:151)
at MessageEnvelopeConsumer.consume(MessageEnvelopeConsumer.scala:47)

Exception is obvious to me, but I don't create two consumer instances. I
have loggers, counters in my test and I'm 100% I do not call

 Consumer.create(new ConsumerConfig(consumerProperties))

 twice during test


Kafka Producer - Multiple broker - Data sent to buffer but not in Queue

2017-04-18 Thread Ranjith Anbazhakan
Hi,

I have been testing behavior of multiple broker instances of kafka in same 
machine and facing inconsistent behavior of producer sent records to buffer not 
being available in queue always.

Tried kafka versions:
0.10.2.0
0.10.1.0

Scenario:

1.   Ran two broker instances in same machine. Say broker 1 as initial 
leader, broker 2 as initial follower.

2.   Stopped broker 1. Now broker 2 became leader.

3.   Now producer sends records for a given topic TEST through send() 
method, followed by flush(). Records have to go to Broker 2 logically. No 
error/exception is thrown by code. (So it is assumed data has been sent 
successfully to buffer)

4.   When using command to check the records count for TEST topic in Broker 
2, the sent records are not added to existing records count for that topic in 
queue.

a.   Used command - kafka-run-class.bat kafka.tools.GetOffsetShell 
--broker-list localhost:9094 --topic TEST --time -1 (where TEST is the used 
topic)

NOTE: **Step 4 is not happening always and is inconsistent**. In the scenario 
when it does not work, if Broker 1 is made UP and then made DOWN, records are 
always been available in queue in Broker 2 post doing Step 3.

Configurations:
Overall Producer configurations: (most are default values)
acks = all
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers =  , 
buffer.memory = 33554432
client.id = producer-1
compression.type = none
connections.max.idle.ms = 54
interceptor.classes = null
key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
linger.ms = 1
max.block.ms = 6
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 6
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 3
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 3
retries = 0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 3
value.serializer = class 
org.apache.kafka.common.serialization.StringSerializer

Broker 1: (server.properties)
broker.id=1
port=9091
advertised.host.name=
auto.create.topics.enable=true
default.replication.factor=2
leader.imbalance.check.interval.seconds=20
topic.metadata.refresh.interval.ms=-1

Broker 2: (server1.properties)
broker.id=2
port=9094
advertised.host.name=
auto.create.topics.enable=true
default.replication.factor=2
leader.imbalance.check.interval.seconds=20
topic.metadata.refresh.interval.ms=-1

Do let know if anyone have faced similar scenarios and why such an issue 
occurs? Let know if any more details are needed.

Thanks,
Ranjith
[Aspire Systems]

This e-mail message and any attachments are for the sole use of the intended 
recipient(s) and may contain proprietary, confidential, trade secret or 
privileged information. Any unauthorized review, use, disclosure or 
distribution is prohibited and may be a violation of law. If you are not the 
intended recipient, please contact the sender by reply e-mail and destroy all 
copies of the original message.