Getting "Illegal batch type" exception on consumers

2019-01-05 Thread Asaf Mesika
checking is done later
if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
throw new CorruptRecordException(String.format("Record
size is less than the minimum record overhead (%d)",
LegacyRecord.RECORD_OVERHEAD_V0));
if (recordSize > maxMessageSize)
throw new CorruptRecordException(String.format("Record
size exceeds the largest allowable message size (%d).",
maxMessageSize));

int batchSize = recordSize + LOG_OVERHEAD;
if (remaining < batchSize)
return null;

byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);

ByteBuffer batchSlice = buffer.slice();
batchSlice.limit(batchSize);
buffer.position(buffer.position() + batchSize);
\
if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE)
throw new CorruptRecordException("Invalid magic found in
record: " + magic);

if (magic > RecordBatch.MAGIC_VALUE_V1)
return new DefaultRecordBatch(batchSlice);
else
return new
AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
}


So the stream constructs DefaultRecordBatch which later fail since they try
to map it o MessageAndOffset but can't do it for DefaultRecordBatch - can't
figure out why..

To me it seems like a bug. I've posted a JIRA ticket
<https://issues.apache.org/jira/browse/KAFKA-7769>, but not comments since
Dec 26th, so I though I can ping here as well and get some pointers from
the community.

Our current work-around is to restart either the server or client, and it
solves it.

Thanks!

Asaf Mesika


Re: Failed to update metadata after 5000 ms

2017-05-10 Thread Asaf Mesika
Without the exact log containing the exception it will be hard to help
On Wed, 10 May 2017 at 23:34 IT Consultant <0binarybudd...@gmail.com> wrote:

> Hi All,
>
> Currently , i am running TLS enabled multi-node kafka .
>
> *Version :* 2.11-0.10.1.1
>
> *Scenario :* Whenever producer tries to produce around 10 records at
> once to kafka . *It gets failed to update metadata after 5000 ms error .*
>
> *Server.properties :*
>
> *[image: Inline image 1]*
>
> *Can you guys please help me fix it asap . It's affecting prod instance. *
>
> *Thanks a lot*
>
>
>
>


Re: Apache Kafka integration using Apache Camel

2017-01-11 Thread Asaf Mesika
Don't specify Kafka dependencies. Camel will transitively bring it.
Otherwise you are causing version conflict.
On Mon, 9 Jan 2017 at 14:20 Kamal C  wrote:

> Can you enable DEBUG logs ? It'll be helpful to debug.
>
> -- Kamal
>
> On Mon, Jan 9, 2017 at 5:37 AM, Gupta, Swati  wrote:
>
> > Hello All,
> >
> > Any help on this would be appreciated.
> > There seems to be no error. Does it look like a version issue?
> >
> > I have updated my pom.xml with the below:
> > 
> > org.springframework.kafka
> > spring-kafka
> > 1.1.2.BUILD-SNAPSHOT
> > 
> >
> > 
> > org.apache.camel
> > camel-kafka
> > 2.17.0
> > 
> >
> > 
> > org.apache.kafka
> > kafka-clients
> > 0.10.1.0
> > 
> > 
> > org.apache.kafka
> > kafka_2.11
> > 0.10.1.0
> > 
> >
> > 
> > org.apache.camel
> > camel-core
> > 2.17.0
> > 
> >
> > Thanks & Regards
> > Swati
> >
> > -Original Message-
> > From: Gupta, Swati [mailto:swati.gu...@anz.com]
> > Sent: Friday, 6 January 2017 4:01 PM
> > To: users@kafka.apache.org
> > Subject: RE: Apache Kafka integration using Apache Camel
> >
> > Yes, the kafka console consumer displays the message correctly.
> > I also tested the same with a Java application, it works fine. There
> seems
> > to be an issue with Camel route trying to consume.
> >
> > There is no error in the console. But, the logs show as below:
> > kafka.KafkaCamelTestConsumer
> > Connected to the target VM, address: '127.0.0.1:65007', transport:
> > 'socket'
> > PID_IS_UNDEFINED: INFO  DefaultCamelContext - Apache Camel 2.17.0
> > (CamelContext: camel-1) is starting
> > PID_IS_UNDEFINED: INFO  ManagedManagementStrategy - JMX is enabled
> > PID_IS_UNDEFINED: INFO  DefaultTypeConverter - Loaded 183 type converters
> > PID_IS_UNDEFINED: INFO  DefaultRuntimeEndpointRegistry - Runtime endpoint
> > registry is in extended mode gathering usage statistics of all incoming
> and
> > outgoing endpoints (cache limit: 1000)
> > PID_IS_UNDEFINED: INFO  DefaultCamelContext - AllowUseOriginalMessage is
> > enabled. If access to the original message is not needed, then its
> > recommended to turn this option off as it may improve performance.
> > PID_IS_UNDEFINED: INFO  DefaultCamelContext - StreamCaching is not in
> use.
> > If using streams then its recommended to enable stream caching. See more
> > details at http://camel.apache.org/stream-caching.html
> > PID_IS_UNDEFINED: INFO  KafkaConsumer - Starting Kafka consumer
> > PID_IS_UNDEFINED: INFO  ConsumerConfig - ConsumerConfig values:
> > auto.commit.interval.ms = 5000
> > auto.offset.reset = earliest
> > bootstrap.servers = [localhost:9092]
> > check.crcs = true
> > client.id =
> > connections.max.idle.ms = 54
> > enable.auto.commit = true
> > exclude.internal.topics = true
> > fetch.max.bytes = 52428800
> > fetch.max.wait.ms = 500
> > fetch.min.bytes = 1024
> > group.id = testing
> > heartbeat.interval.ms = 3000
> > interceptor.classes = null
> > key.deserializer = class org.apache.kafka.common.serialization.
> > StringDeserializer
> > max.partition.fetch.bytes = 1048576
> > max.poll.interval.ms = 30
> > max.poll.records = 500
> > metadata.max.age.ms = 30
> > metric.reporters = []
> > metrics.num.samples = 2
> > metrics.sample.window.ms = 3
> > partition.assignment.strategy = [org.apache.kafka.clients.
> > consumer.RangeAssignor]
> > receive.buffer.bytes = 32768
> > reconnect.backoff.ms = 50
> > request.timeout.ms = 4
> > retry.backoff.ms = 100
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > sasl.kerberos.min.time.before.relogin = 6
> > sasl.kerberos.service.name = null
> > sasl.kerberos.ticket.renew.jitter = 0.05
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > sasl.mechanism = GSSAPI
> > security.protocol = PLAINTEXT
> > send.buffer.bytes = 131072
> > session.timeout.ms = 3
> > ssl.cipher.suites = null
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > ssl.endpoint.identification.algorithm = null
> > ssl.key.password = null
> > ssl.keymanager.algorithm = SunX509
> > ssl.keystore.location = null
> > ssl.keystore.password = null
> > ssl.keystore.type = JKS
> > ssl.protocol = TLS
> > ssl.provider = null
> > ssl.secure.random.implementation = null
> > ssl.trustmanager.algorithm = PKIX
> > ssl.truststore.location = null
> >

Re: Kafka and zookeeper stores and mesos env

2016-12-08 Thread Asaf Mesika
Off-question a bit - Using the Kafka Mesos framework should save you from
handling those questions: https://github.com/mesos/kafka


On Thu, Dec 8, 2016 at 2:33 PM Mike Marzo 
wrote:

If i'm running a 5 node zk cluster and a 3 node kafka cluster in dcker on a
mesos/marathon environment where my zk and broker nodes are all leveraging
local disk on the hosts they are running on is there any value to the local
data being preserved across restarts?

In other words  when a broker node fails and restarts on the same
machine does it leverage any of the actual data it has on disk from the
prior life or is it assumed to start from new being re-hydrated from the
other in sync nodes in the running cluster?Same question for
zookeeper...

I'm trying to asses the value of local data preservation across marathon re
scheduled jobs. Im thinking the data is blown away as much would be stale
but not sure since log retentions could be set quite high and having
history could make re-sync more efficient by effectively only closing the
gap for deltas while down.  Anyone know


Re: Storing Kafka Message JSON to deep storage like S3

2016-12-06 Thread Asaf Mesika
We rolled our own since we couldn't (1.5 years ago) find one. The code is
quite simple and short.


On Tue, Dec 6, 2016 at 1:55 PM Aseem Bansal  wrote:

> I just meant that is there an existing tool which does that. Basically I
> tell it "Listen to all X streams and write them to S3/HDFS at Y path as
> JSON". I know spark streaming can be used and there is flume. But I am not
> sure about their scalability/reliability. That's why I thought to initiate
> a discussion here to see whether someone knows about that already.
>
> On Tue, Dec 6, 2016 at 5:14 PM, Sharninder  wrote:
>
> > What do you mean by streaming way? The logic to push to S3 will be in
> your
> > consumer, so it totally depends on how you want to read and store. I
> think
> > that's an easier way to do what you want to, instead of trying to backup
> > kafka and then read messages from there. Not even sure that's possible.
> >
> > On Tue, Dec 6, 2016 at 5:11 PM, Aseem Bansal 
> wrote:
> >
> > > I get that we can read them and store them in batches but is there some
> > > streaming way?
> > >
> > > On Tue, Dec 6, 2016 at 5:09 PM, Aseem Bansal 
> > wrote:
> > >
> > > > Because we need to do exploratory data analysis and machine learning.
> > We
> > > > need to backup the messages somewhere so that the data scientists can
> > > > query/load them.
> > > >
> > > > So we need something like a router that just opens up a new consumer
> > > group
> > > > which just keeps on storing them to S3.
> > > >
> > > > On Tue, Dec 6, 2016 at 5:05 PM, Sharninder Khera <
> sharnin...@gmail.com
> > >
> > > > wrote:
> > > >
> > > >> Why not just have a parallel consumer read all messages from
> whichever
> > > >> topics you're interested in and store them wherever you want to? You
> > > don't
> > > >> need to "backup" Kafka messages.
> > > >>
> > > >> _
> > > >> From: Aseem Bansal 
> > > >> Sent: Tuesday, December 6, 2016 4:55 PM
> > > >> Subject: Storing Kafka Message JSON to deep storage like S3
> > > >> To:  
> > > >>
> > > >>
> > > >> Hi
> > > >>
> > > >> Has anyone done a storage of Kafka JSON messages to deep storage
> like
> > > S3.
> > > >> We are looking to back up all of our raw Kafka JSON messages for
> > > >> Exploration. S3, HDFS, MongoDB come to mind initially.
> > > >>
> > > >> I know that it can be stored in kafka itself but storing them in
> Kafka
> > > >> itself does not seem like a good option as we won't be able to query
> > it
> > > >> and
> > > >> the configurations of machines containing kafka will have to be
> > > increased
> > > >> as we go. Something like S3 we won't have to manage.
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > --
> > Sharninder
> >
>


Re: Topic discovery when supporting multiple kafka clusters

2016-12-06 Thread Asaf Mesika
Why not re-use same cluster? You can assign topics to be live only within a
specific set of brokers. Thus you have one "bus" for messages, simplifying
your applications code and configurations

On Mon, Dec 5, 2016 at 9:43 PM Yifan Ying  wrote:

> Hi,
>
> Initially, we have only one Kafka cluster shared across all teams. But now
> this cluster is very close to out of resources (disk space, # of
> partitions, etc.). So we are considering adding another Kafka cluster. But
> what's the best practice of topic discovery, so that applications know
> which cluster their topics live? We have been using Zookeeper for service
> discovery, maybe it's also good for this purpose?
>
> Thanks
>
> --
> Yifan
>


Re: Processing older records Kafka Consumer

2016-12-06 Thread Asaf Mesika
Seek will do the trick. Just make sure that when you run it, it only runs
on partitions the current reader is assigned (call assignments() and filter
only the ones assigned to you now)

On Tue, Dec 6, 2016 at 12:30 PM Amit K <amitk@gmail.com> wrote:

> Sorry for not providing complete information.
>
> I use the auto-commit. Most of the other properties are more or less the
> default one.
>
> Actually further analysis reveled that the records are consumed by consumer
> but some dependent component was down (unfortunately it went completely
> un-detected :( ). Hence now I need to reconsume them all for last 2 days.
>
> Will seek() be helpful, like having another application tuned to same topic
> and start consuming in it from approx offset that was there 2 days before?
>
> Thanks for help in advance!
>
> On Tue, Dec 6, 2016 at 3:35 PM, Asaf Mesika <asaf.mes...@gmail.com> wrote:
>
> > Do you use auto-commit or committing your self? I'm trying to figure out
> > how the offset moved if it was stuck.
> >
> > On Tue, Dec 6, 2016 at 10:28 AM Amit K <amitk@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > Is there any way to re-consume the older records from Kafka broker with
> > > kafka consumer?
> > >
> > > I am using kafka 0.9.0.0 In one of the scenario, I saw records for 2
> days
> > > from today were not consumed as consumer was stuck. When the consumer
> > > restarted, it started processing records from today but older records
> for
> > > last 2 days are not processed.
> > >
> > > Is there any way to achieve the same?
> > > Any help will be highly appreciated.
> > >
> > > Thanks,
> > > Amit
> > >
> >
>


Re: Detecting when all the retries are expired for a message

2016-12-06 Thread Asaf Mesika
Vatsal:

I don't think they merged the fix for this bug (retries doesn't work) in
0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547


On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal <mev...@sky.optymyze.com>
wrote:

> Hello,
>
> Bumping up this thread in case anyone of you have any say on this issue.
>
> Regards,
> Vatsal
>
> -Original Message-
> From: Mevada, Vatsal
> Sent: 02 December 2016 16:16
> To: Kafka Users <users@kafka.apache.org>
> Subject: RE: Detecting when all the retries are expired for a message
>
> I executed the same producer code for a single record file with following
> config:
>
> properties.put("bootstrap.servers", bootstrapServer);
> properties.put("key.serializer",
> StringSerializer.class.getCanonicalName());
> properties.put("value.serializer",
> StringSerializer.class.getCanonicalName());
> properties.put("acks", "-1");
> properties.put("retries", 5);
> properties.put("request.timeout.ms", 1);
>
> I have kept request.timeout.ms=1 to make sure that message delivery will
> fail with TimeoutException. Since the retries are 5 then the program
> should take at-least 5 ms (50 seconds) to complete for single record.
> However the program is completing almost instantly with only one callback
> with TimeoutException. I suspect that producer is not going for any
> retries. Or am I missing something in my code?
>
> My Kafka version is 0.10.0.1.
>
> Regards,
> Vatsal
> Am I missing any configuration or
> -Original Message-
> From: Ismael Juma [mailto:isma...@gmail.com]
> Sent: 02 December 2016 13:30
> To: Kafka Users <users@kafka.apache.org>
> Subject: RE: Detecting when all the retries are expired for a message
>
> The callback is called after the retries have been exhausted.
>
> Ismael
>
> On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <mev...@sky.optymyze.com> wrote:
>
> > @Ismael:
> >
> > I can handle TimeoutException in the callback. However as per the
> > documentation of Callback(link: https://kafka.apache.org/0100/
> > javadoc/org/apache/kafka/clients/producer/Callback.html),
> > TimeoutException is a retriable exception and it says that it "may be
> > covered by increasing #.retries". So even if I get TimeoutException in
> > callback, wouldn't it try to send message again until all the retries
> > are done? Would it be safe to assume that message delivery is failed
> > permanently just by encountering TimeoutException in callback?
> >
> > Here is a snippet from above mentioned documentation:
> > "exception - The exception thrown during processing of this record.
> > Null if no error occurred. Possible thrown exceptions include:
> > Non-Retriable exceptions (fatal, the message will never be sent):
> > InvalidTopicException OffsetMetadataTooLargeException
> > RecordBatchTooLargeException RecordTooLargeException
> > UnknownServerException Retriable exceptions (transient, may be covered
> > by increasing #.retries): CorruptRecordException
> > InvalidMetadataException NotEnoughReplicasAfterAppendException
> > NotEnoughReplicasException OffsetOutOfRangeException TimeoutException
> > UnknownTopicOrPartitionException"
> >
> > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
> > face the issue that you are mentioning. I mentioned documentation link
> > of 0.9 by mistake.
> >
> > Regards,
> > Vatsal
> > -Original Message-
> > From: Asaf Mesika [mailto:asaf.mes...@gmail.com]
> > Sent: 02 December 2016 00:32
> > To: Kafka Users <users@kafka.apache.org>
> > Subject: Re: Detecting when all the retries are expired for a message
> >
> > There's a critical bug in that section that has only been fixed in
> > 0.9.0.2 which has not been release yet. Without the fix it doesn't
> really retry.
> > I forked the kafka repo, applied the fix, built it and placed it in
> > our own Nexus Maven repository until 0.9.0.2 will be released.
> >
> > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
> >
> > Feel free to use it.
> >
> > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <ism...@juma.me.uk> wrote:
> >
> > > The callback should give you what you are asking for. Has it not
> > > worked as you expect when you tried it?
> > >
> > > Ismael
> > >
> > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > > <mev...@sky.optymyze.com>
> > > wrote:
> > >
> > >

Re: Processing older records Kafka Consumer

2016-12-06 Thread Asaf Mesika
Do you use auto-commit or committing your self? I'm trying to figure out
how the offset moved if it was stuck.

On Tue, Dec 6, 2016 at 10:28 AM Amit K  wrote:

> Hi,
>
> Is there any way to re-consume the older records from Kafka broker with
> kafka consumer?
>
> I am using kafka 0.9.0.0 In one of the scenario, I saw records for 2 days
> from today were not consumed as consumer was stuck. When the consumer
> restarted, it started processing records from today but older records for
> last 2 days are not processed.
>
> Is there any way to achieve the same?
> Any help will be highly appreciated.
>
> Thanks,
> Amit
>


Re: Detecting when all the retries are expired for a message

2016-12-01 Thread Asaf Mesika
There's a critical bug in that section that has only been fixed in 0.9.0.2
which has not been release yet. Without the fix it doesn't really retry.
I forked the kafka repo, applied the fix, built it and placed it in our own
Nexus Maven repository until 0.9.0.2 will be released.

https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio

Feel free to use it.

On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:

> The callback should give you what you are asking for. Has it not worked as
> you expect when you tried it?
>
> Ismael
>
> On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal 
> wrote:
>
> > Hi,
> >
> >
> >
> > I am reading a file and dumping each record on Kafka. Here is my producer
> > code:
> >
> >
> >
> > public void produce(String topicName, String filePath, String
> > bootstrapServers, String encoding) {
> >
> > try (BufferedReader bf = getBufferedReader(filePath,
> > encoding);
> >
> > KafkaProducer producer =
> > initKafkaProducer(bootstrapServers)) {
> >
> > String line;
> >
> > while ((line = bf.readLine()) != null) {
> >
> > producer.send(new
> > ProducerRecord<>(topicName, line), (metadata, e) -> {
> >
> > if (e !=
> > null) {
> >
> >
> >   e.printStackTrace();
> >
> > }
> >
> > });
> >
> > }
> >
> > producer.flush();
> >
> > } catch (IOException e) {
> >
> > Throwables.propagate(e);
> >
> > }
> >
> > }
> >
> >
> >
> > private static KafkaProducer initKafkaProducer(String
> > bootstrapServer) {
> >
> > Properties properties = new Properties();
> >
> > properties.put("bootstrap.servers", bootstrapServer);
> >
> > properties.put("key.serializer", StringSerializer.class.
> > getCanonicalName());
> >
> > properties.put("value.serializer",
> StringSerializer.class.
> > getCanonicalName());
> >
> > properties.put("acks", "-1");
> >
> > properties.put("retries", 10);
> >
> > return new KafkaProducer<>(properties);
> >
> > }
> >
> >
> >
> > private BufferedReader getBufferedReader(String filePath, String
> encoding)
> > throws UnsupportedEncodingException, FileNotFoundException {
> >
> > return new BufferedReader(new InputStreamReader(new
> > FileInputStream(filePath), Optional.ofNullable(encoding).
> > orElse("UTF-8")));
> >
> > }
> >
> >
> >
> > As per the official documentation of Callback > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > TimeoutException is a retriable exception. As I have kept retries 10,
> > producer will try to resend the message if delivering some message fails
> > with TimeoutException. I am looking for some reliable to way to detect
> when
> > delivery of a message is failed permanently after all retries.
> >
> >
> >
> > Regards,
> >
> > Vatsal
> >
>


Re: Release Kafka 0.9.0.2?

2016-12-01 Thread Asaf Mesika
Waiting for the retries fix which doesn't work before 0.9.0.2  - had to
apply the Pull Request my self to a forked repo to get our Log Receiver
working properly. It's a must in my opinion.

On Thu, Dec 1, 2016 at 6:46 PM Ben Osheroff  wrote:

> +1.  At the least an answer regarding timelines would be good here.
>
> On Wed, Nov 30, 2016 at 02:46:59PM +0100, Stevo Slavić wrote:
> > Hello Apache Kafka community,
> >
> > Would it be possible to release 0.9.0.2?
> >
> > It has few important fixes, like KAFKA-3594
> >  and would be helpful
> for
> > us that cannot upgrade to 0.10.x yet.
> >
> > Kind regards,
> > Stevo Slavic.
>


Re: JMXTrans Monitoring for Individual Topics

2016-07-09 Thread Asaf Mesika
Something we use which be helpful: We use jmx2graphite
, so we won't have to manually pick
and choose metrics - we simply send them all. When all hell breaks loose,
you never know in advance which metric you needed to research root cause
analysis.


On Fri, Jul 8, 2016 at 12:28 AM Craig Swift
 wrote:

> Hello,
>
> I was hoping someone might know this off the top of their head. I haven't
> been able to find any documentation on it so I was curious if it was still
> available. We're upgrading to Kafka 8.2 from 8.1 (ya, we're a bit behind)
> and we used to have the following in jmxtrans that would give us monitoring
> on individual topics:
>
> {
>  "outputWriters": [
>   {
>"@class": "com.googlecode.jmxtrans.model.output.GraphiteWriter",
>"settings": {
> "typeNames": [ "name" ],
> "host": "bla.blabla.net",
> "port": 2013,
> "rootPrefix": "app.infrastructure.kafka.green.green-01"
>}
>   }
>  ],
>  "resultAlias": "topics.topic_name",
>  "obj": "kafka.log:type=Log,name=topic_name-*",
>  "attr": [ "Value" ]
> }
>
> which would yield metrics on size, log end offset and number of log
> segments for that topic. Is a similar option still available in 8.2?
>


Re: Partition size skew using default partitioner (without key)

2016-07-04 Thread Asaf Mesika
Apparently it's documented in the FAQ - but I ignored it since it said
"0.8.0" and I was using 0.8.2.1. After reading all the lengthy forum post
dating to 2013: The problematic code there is in DefaultEventHandler.scala,
but if I'm only using KafkaProducer.java - the java flavor - I won't be
exposed to this behaviour since you moved to NIO thus have a single socket
per broker?

On Tue, Jul 5, 2016 at 7:14 AM Asaf Mesika <asaf.mes...@gmail.com> wrote:

> Since the image is now shown, here's a direct link to it:
> https://s32.postimg.org/xoet3vu2t/image.png
>
> On Tue, Jul 5, 2016 at 7:01 AM Asaf Mesika <asaf.mes...@gmail.com> wrote:
>
>> As we continue to track down the cause I'm trying to ping back here in
>> case someone new might have an answer to the question below?
>>
>>
>> On Thu, Jun 16, 2016 at 12:39 PM Asaf Mesika <asaf.mes...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We've noticed that we have some partitions receiving more messages than
>>> others. What I've done to learn that is:
>>> * In Kafka Manager, per a given topic, the list of Partition Information
>>> is displayed.
>>> * For each partition there's a column called Latest Offset - which I
>>> assume is the producer offset. As I understand, this is the total number of
>>> messages written to this partition since the topic was created (and/or the
>>> partition was added)
>>> * I plotted the two columns: Partition number and Latest Offset.
>>>
>>> This is what I got:
>>> [image: pasted1]
>>>
>>> I'm using Kafka 0.8.2.1 with the new Producer API. We've using default
>>> partitioner, and we're *not* supplying partition number nor a key, thus
>>> it should be round-robin.
>>>
>>> From some reason, we see this.
>>>
>>> I was wondering if someone from the community ever encountered such a
>>> behaviour?
>>>
>>> Thanks!
>>>
>>> Asaf Mesika
>>> Logz.io
>>>
>>>


Re: Partition size skew using default partitioner (without key)

2016-07-04 Thread Asaf Mesika
Since the image is now shown, here's a direct link to it:
https://s32.postimg.org/xoet3vu2t/image.png

On Tue, Jul 5, 2016 at 7:01 AM Asaf Mesika <asaf.mes...@gmail.com> wrote:

> As we continue to track down the cause I'm trying to ping back here in
> case someone new might have an answer to the question below?
>
>
> On Thu, Jun 16, 2016 at 12:39 PM Asaf Mesika <asaf.mes...@gmail.com>
> wrote:
>
>> Hi,
>>
>> We've noticed that we have some partitions receiving more messages than
>> others. What I've done to learn that is:
>> * In Kafka Manager, per a given topic, the list of Partition Information
>> is displayed.
>> * For each partition there's a column called Latest Offset - which I
>> assume is the producer offset. As I understand, this is the total number of
>> messages written to this partition since the topic was created (and/or the
>> partition was added)
>> * I plotted the two columns: Partition number and Latest Offset.
>>
>> This is what I got:
>> [image: pasted1]
>>
>> I'm using Kafka 0.8.2.1 with the new Producer API. We've using default
>> partitioner, and we're *not* supplying partition number nor a key, thus
>> it should be round-robin.
>>
>> From some reason, we see this.
>>
>> I was wondering if someone from the community ever encountered such a
>> behaviour?
>>
>> Thanks!
>>
>> Asaf Mesika
>> Logz.io
>>
>>


Re: Partition size skew using default partitioner (without key)

2016-07-04 Thread Asaf Mesika
As we continue to track down the cause I'm trying to ping back here in case
someone new might have an answer to the question below?

On Thu, Jun 16, 2016 at 12:39 PM Asaf Mesika <asaf.mes...@gmail.com> wrote:

> Hi,
>
> We've noticed that we have some partitions receiving more messages than
> others. What I've done to learn that is:
> * In Kafka Manager, per a given topic, the list of Partition Information
> is displayed.
> * For each partition there's a column called Latest Offset - which I
> assume is the producer offset. As I understand, this is the total number of
> messages written to this partition since the topic was created (and/or the
> partition was added)
> * I plotted the two columns: Partition number and Latest Offset.
>
> This is what I got:
> [image: pasted1]
>
> I'm using Kafka 0.8.2.1 with the new Producer API. We've using default
> partitioner, and we're *not* supplying partition number nor a key, thus
> it should be round-robin.
>
> From some reason, we see this.
>
> I was wondering if someone from the community ever encountered such a
> behaviour?
>
> Thanks!
>
> Asaf Mesika
> Logz.io
>
>


Re: kafka + logstash

2016-06-20 Thread Asaf Mesika
You should try opening an issue in the github repo of this plugin. Also try
writing in the LogStash forum

On יום א׳, 19 ביוני 2016 at 20:39 Fahimeh Ashrafy 
wrote:

> Hello all
>
> I use kafka input and kafka output plugin in logstash. I have high cpu
> usage, what can I do to get it better?
> logstash version 2.3.2
> logstash-input-kafka 2.0.8
> logstash-output-kafka 2.0.5
>
> Thanks a lot
>


Partition size skew using default partitioner (without key)

2016-06-16 Thread Asaf Mesika
Hi,

We've noticed that we have some partitions receiving more messages than
others. What I've done to learn that is:
* In Kafka Manager, per a given topic, the list of Partition Information is
displayed.
* For each partition there's a column called Latest Offset - which I assume
is the producer offset. As I understand, this is the total number of
messages written to this partition since the topic was created (and/or the
partition was added)
* I plotted the two columns: Partition number and Latest Offset.

This is what I got:
[image: pasted1]

I'm using Kafka 0.8.2.1 with the new Producer API. We've using default
partitioner, and we're *not* supplying partition number nor a key, thus it
should be round-robin.

>From some reason, we see this.

I was wondering if someone from the community ever encountered such a
behaviour?

Thanks!

Asaf Mesika
Logz.io


Re: Does the Kafka Streams DSL support non-Kafka sources/sinks?

2016-06-06 Thread Asaf Mesika
I'd stay off the Camel. It's performance is quite low. Up to 5-10 mb/sec
it׳s ok but above that it will be your bottleneck.
The problem with Camel is that sometime it's Endpoints have special
behavior which is hard to understand and debugging it is a mess. We are now
migrating away from it.

On Fri, 3 Jun 2016 at 05:26 Christian Posta 
wrote:

> Hate to bring up "non-flashy" technology... but Apache Camel would be a
> great fit for something like this. Two java libraries each with very strong
> suits.
>
>
>
> On Thu, Jun 2, 2016 at 6:09 PM, Avi Flax  wrote:
>
> > On 6/2/16, 07:03, "Eno Thereska"  wrote:
> >
> > > Using the low-level streams API you can definitely read or write to
> > arbitrary
> > > locations inside the process() method.
> >
> > Ah, good to know — thank you!
> >
> > > However, back to your original question: even with the low-level
> streams
> > > API the sources and sinks can only be Kafka topics for now. So, as Gwen
> > > mentioned, Connect would be the way to go to bring the data to a Kafka
> > > Topic first.
> >
> > Got it — thank you!
> >
> >
>
>
> --
> *Christian Posta*
> twitter: @christianposta
> http://www.christianposta.com/blog
> http://fabric8.io
>


Re: Spikes in kafka bytes out (while bytes in remain the same)

2016-04-16 Thread Asaf Mesika
Another thought: Brokers replicate data in. So a record weighing 10 bytes
will be written out once for replication and one more time to a consumer so
it will be 20 bytes out. Makes sense?
On Thu, 14 Apr 2016 at 02:46 Jorge Rodriguez <jo...@bloomreach.com> wrote:

> Thanks for your response Asaf.  I have 4 brokers.  These measurements are
> from the kafka brokers.
>
> This measurement on this graph comes from Kafka.  It is a sum across all 4
> brokers of the
> metric: kafka.server.BrokerTopicMetrics.BytesInPerSec.1MinuteRate.
>
> But I also have a system metric which I feed independently using collectd
> "interface" plugin.  And the bytes out and in match the ones reported by
> kafka fairly well.  As well there is a corresponding increase in network
> packets sent.
>
> Also, in the SparkStreaming side, I can see that during these spikes, the
> number of received packets and bytes also spikes.
>
> So during the spikes, I believe that some of the fetch requests are perhaps
> failing and we hit a retry.  I am debugging that currently and I think it's
> related to the STW GC which happens on spark streaming occasionally.
> Working on some GC tuning should alleviate this.
>
> However, even if this is the case, this would not explain though why under
> normal operations, the number of bytes out is 2x the number of bytes in.
> Since I only have 1 consumer for each topic, I would expect the numbers to
> be fairly close.  Do you
>
>
>
>
> On Tue, Apr 12, 2016 at 8:31 PM, Asaf Mesika <asaf.mes...@gmail.com>
> wrote:
>
> > Where exactly do you get the measurement from? Your broker? Do you have
> > only one? Your producer? Your spark job?
> > On Mon, 11 Apr 2016 at 23:54 Jorge Rodriguez <jo...@bloomreach.com>
> wrote:
> >
> > > We are running a kafka cluster for our real-time pixel processing
> > > pipeline.  The data is produced from our pixel servers into kafka, and
> > then
> > > consumed by a spark streaming application.  Based on this, I would
> expect
> > > that the bytes in vs bytes out should be roughly equal, as each message
> > > should be consumed once.
> > >
> > > Under normal operations, the bytes out is a little less than 2X the
> bytes
> > > in.  Does anyone know why this is?  We do use a replication factor of
> 2.
> > >
> > > Occasionally, we get a spike in Bytes out.  But bytes in remain the
> same
> > > (see image below).  This correlates with a significant delay in
> > processing
> > > time in the spark streaming side.
> > >
> > > Below is a chart of kafka reported bytes out vs in.  The system level
> > > network metrics show the same information (transferred bytes spike).
> > >
> > > Could anyone provide some tips for debugging/getting to the bottom of
> > this
> > > issue?
> > >
> > > Thanks,
> > > Jorge
> > >
> > > *Kafka reported Bytes in Per topic and for all topics vs Kafka bytes
> > out:*
> > >
> > > [image: Inline image 1]
> > >
> >
>


Re: Spikes in kafka bytes out (while bytes in remain the same)

2016-04-12 Thread Asaf Mesika
Where exactly do you get the measurement from? Your broker? Do you have
only one? Your producer? Your spark job?
On Mon, 11 Apr 2016 at 23:54 Jorge Rodriguez  wrote:

> We are running a kafka cluster for our real-time pixel processing
> pipeline.  The data is produced from our pixel servers into kafka, and then
> consumed by a spark streaming application.  Based on this, I would expect
> that the bytes in vs bytes out should be roughly equal, as each message
> should be consumed once.
>
> Under normal operations, the bytes out is a little less than 2X the bytes
> in.  Does anyone know why this is?  We do use a replication factor of 2.
>
> Occasionally, we get a spike in Bytes out.  But bytes in remain the same
> (see image below).  This correlates with a significant delay in processing
> time in the spark streaming side.
>
> Below is a chart of kafka reported bytes out vs in.  The system level
> network metrics show the same information (transferred bytes spike).
>
> Could anyone provide some tips for debugging/getting to the bottom of this
> issue?
>
> Thanks,
> Jorge
>
> *Kafka reported Bytes in Per topic and for all topics vs Kafka bytes out:*
>
> [image: Inline image 1]
>


Re: New consumer: OutOfMemoryError: Direct buffer memory

2016-04-12 Thread Asaf Mesika
Track the nio direct memory buffer size on your client jvm. Connect to it
using VisualVM or Jolokia with halt.io and see where it breaks. This error
means your host ran out of memory unless you limit it.
On Tue, 12 Apr 2016 at 03:18 Kanak Biscuitwala  wrote:

> Hi,
>
> I'm running Kafka's new consumer with message handlers that can sometimes
> take a lot of time to return, and combining that with manual offset
> management (to get at-least-once semantics). Since poll() is the only way
> to heartbeat with the consumer, I have a thread that runs every 500
> milliseconds that does the following:
>
> 1) Pause all partitions
> 2) Call poll(0)
> 3) Resume all partitions
>
> This generally works, but I'm occasionally seeing messages like this:
>
> java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:658)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108)
> at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
> at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
> at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>
> Given this trace, I have the following questions:
>
> 1) Is there data loss when this happens?
> 2) How can I stop triggering this error?
>
> Thanks,
> Kanak


Re: dumping JMX data

2016-04-12 Thread Asaf Mesika
If I'm not mistaken jmxtrans does not let you take All metrics beans or
just a group of them using wildcard. You are forced to specify the exact
bean name which this email shows how cumbersome this is. Also jmxtrans
issue an rpc call per bean while using Jolokia you do one post request to
get all the bean you listed in the body.

Plus hawt.io on top of Jolokia makes VisualVM and likes look pale in
comparison.

On Fri, 1 Apr 2016 at 11:11 Achanta Vamsi Subhash <
achanta.va...@flipkart.com> wrote:

> Why not use tools like jmxtrans 
> and
> send your metrics to Graphite/OpenTsdb.etc? Why do
> serialization/de-serialization twice?
>
> On Fri, Apr 1, 2016 at 9:51 AM, Gerard Klijs 
> wrote:
>
> > Don't know if adding it to Kafka is a good thing. I assume you need some
> > java opts settings for it to work, and with other solutions these would
> be
> > different. It could be enabled with an option off course, then it's not
> in
> > the way if you use something else.
> > We use zabbix, this is a single tool which can be used to read in jmx
> data,
> > store the data for a certain time, configurable for each item, and create
> > triggers and graphs for those.
> >
> > To see and copy jmx items we use the Oracle Java mission control, it has
> a
> > tab with info on each jmx item, which can be copied to clipboard.
> >
> > On Fri, Apr 1, 2016, 02:03 Sean Clemmer 
> wrote:
> >
> > > Another +1 for Jolokia. We've got a pretty cool setup here that deploys
> > > Jolokia alongside Kafka, and we wrote a small Sensu plugin to grab all
> > the
> > > stats from Jolokia's JSON API and reformat them for Graphite.
> > >
> > > On Thu, Mar 31, 2016 at 4:36 PM, craig w  wrote:
> > >
> > > > Including jolokia would be great, I've used for kafka and it worked
> > well.
> > > > On Mar 31, 2016 6:54 PM, "Christian Posta" <
> christian.po...@gmail.com>
> > > > wrote:
> > > >
> > > > > What if we added something like this to Kafka? https://jolokia.org
> > > > > I've added a JIRA to do that, just haven't gotten to it yet. Will
> > soon
> > > > > though, especially if it'd be useful for others.
> > > > >
> > > > > https://issues.apache.org/jira/browse/KAFKA-3377
> > > > >
> > > > > On Thu, Mar 31, 2016 at 2:55 PM, David Sidlo 
> > > > wrote:
> > > > >
> > > > > > The Kafka JmxTool works fine although it is not user friendly, in
> > > that
> > > > > you
> > > > > > cannot perform a query of the Kafka Server mbeans to determine
> > > content
> > > > > and
> > > > > > to determine the path-string that you need to place into the
> > > > -object-name
> > > > > > option.
> > > > > >
> > > > > > Here's how I solved the problem...
> > > > > >
> > > > > > First, make sure that Kafka is running with jms options
> enabled...
> > > > > >
> > > > > > -  The following opens up the jxm port with no
> > authentication
> > > > > (for
> > > > > > testing)...
> > > > > > -Dcom.sun.management.jmxremote
> > > -Dcom.sun.management.jmxremote.port=
> > > > > > -Dcom.sun.management.jmxremote.ssl=false
> > > > > > -Dcom.sun.management.jmxremote.authenticate=false
> > > > > >
> > > > > > Second, get jstatd running on the same server so that you can use
> > > > > VisualVM
> > > > > > to look into what is going on inside.
> > > > > >
> > > > > > Then, use VisualVM along with its jmx-plugin to view the mbeans
> and
> > > > > > contents.
> > > > > >
> > > > > > When using VisualVM, you will first connect to jstatd, then you
> > have
> > > to
> > > > > > right click on the host to request a JMX connection, where you
> will
> > > > get a
> > > > > > dialog, where you have to add the jmx port number to the host
> name
> > > > (after
> > > > > > the colon).
> > > > > >
> > > > > > When you view the mbeans, you will see the path to the given
> > > attribute
> > > > if
> > > > > > you hover over it with the mouse pointer, but only for a
> moment...
> > > The
> > > > > > string can be quite long and... Unfortunately, you don't have the
> > > > option
> > > > > to
> > > > > > capture that string into the cut-buffer via the interface. I
> used a
> > > > > screen
> > > > > > capture utility to capture the string and typed it into the
> > terminal.
> > > > > >
> > > > > > So here is what my first working query looked like...
> > > > > >
> > > > > > /opt/kafka/kafka/bin/kafka-run-class.sh kafka.tools.JmxTool
> > > > --object-name
> > > > > >
> > > > >
> > > >
> > >
> >
> "kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=ReplicaFetcherThread-0-5,brokerHost=
> > > > > > hostname05.cluster.com,brokerPort=9092" --jmx-url
> > > > > > service:jmx:rmi:///jndi/rmi://`hostname`:/jmxrmi
> > > > > >
> > > > > > That command will output all of the attributes for the given
> > > > object-name,
> > > > > > and it's a long object-name.
> > > > > > With some experimentation, I found that you can use 

Re: Large Size Error even when the message is small

2016-03-02 Thread Asaf Mesika
Can you show your code for sending?

On Tue, 1 Mar 2016 at 21:59 Fang Wong  wrote:

> [2016-02-26 20:33:42,997] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, [2016-02-26
> 20:33:42,997] INFO Closing socket connection to /10.224.146.58 due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,025] INFO Closing socket connection to /10.224.146.62
> due to invalid request: Request of length 1937006964 is not valid, it is
> larger than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,047] INFO Closing socket connection to /10.224.146.63
> due to invalid request: Request of length 1937006964 is not valid, it is
> larger than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,049] INFO Closing socket connection to /10.224.146.61
> due to invalid request: Request of length 1937006964 is not valid, it is
> larger than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,055] INFO Closing socket connection to /10.224.146.60
> due to invalid request: Request of length 1937006964 is not valid, it is
> larger than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,112] INFO Closing socket connection to /10.224.146.59
> due to invalid request: Request of length 1937006964 is not valid, it is
> larger than the maximum size of 104857600 bytes.
> (kafka.network.Processor)of
> 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,025] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,047] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,049] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,055] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,112] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
>


Re: Memory leak in new client API (Java) in 0.8.2.1?

2016-02-28 Thread Asaf Mesika
No. Since I run out of memory on the machine, but heap remains low (700mb -
2g)

On Sun, Feb 28, 2016 at 7:44 PM Christian Posta <christian.po...@gmail.com>
wrote:

> Do you eventually run OOM?
>
> On Sunday, February 28, 2016, Asaf Mesika <asaf.mes...@gmail.com> wrote:
>
> > Hi,
> >
> > I'm seeing slow off-heap memory leak in production.
> > I've managed to recreated the scenario in a testing environment - I have
> > one ZooKeeper node and one Kafka Broker (running on same machine). I have
> > one java process which runs a thread which constantly writes to Kafka
> using
> > 16 KafkaProducer.
> > I see that the resident memory of my java process is slowly decreasing
> > (roughly 250mb every 15 min). When running GC it cleans the heap to 730mb
> > roughly but resident remains high: 2.3gb and increasing steadily. So from
> > that I presume it's a leak of off-heap memory.
> >
> > I'm little puzzled here since I haven't seen any usage of
> > ByteBuffer.allocateDirect() in the new client code, but I though I'll
> check
> > with the audience in case you are aware of any issue that might cause it?
> > I'm chasing this leak for several days, and managed to track it down to
> the
> > code writing to Kafka, so I'm a little desperate  :) any help will do.
> >
> > Thanks!
> >
> > Asaf
> >
>
>
> --
> *Christian Posta*
> twitter: @christianposta
> http://www.christianposta.com/blog
> http://fabric8.io
>


Memory leak in new client API (Java) in 0.8.2.1?

2016-02-28 Thread Asaf Mesika
Hi,

I'm seeing slow off-heap memory leak in production.
I've managed to recreated the scenario in a testing environment - I have
one ZooKeeper node and one Kafka Broker (running on same machine). I have
one java process which runs a thread which constantly writes to Kafka using
16 KafkaProducer.
I see that the resident memory of my java process is slowly decreasing
(roughly 250mb every 15 min). When running GC it cleans the heap to 730mb
roughly but resident remains high: 2.3gb and increasing steadily. So from
that I presume it's a leak of off-heap memory.

I'm little puzzled here since I haven't seen any usage of
ByteBuffer.allocateDirect() in the new client code, but I though I'll check
with the audience in case you are aware of any issue that might cause it?
I'm chasing this leak for several days, and managed to track it down to the
code writing to Kafka, so I'm a little desperate  :) any help will do.

Thanks!

Asaf