Kafka and spark integration

2016-10-26 Thread Mohan Nani
Any body know the end to end hadoop data flow  which has Kafka - spark
integration.

I am primarily concerned on how kafka messages written to kafka partition
gets captured into files?


Re: handleFetchRequest throw exception

2016-10-26 Thread Json Tu
Thanks to guozhang.
According to your suggestions,I found my new patch to kafka 0.9.0.0 may casue 
the problem,
In delayedfetch.scala,  I include import 
org.apache.kafka.common.errors.NotLeaderForPartitionException but not import 
kafka.common.NotLeaderForPartitionException for intelij auto import,
so the getLeaderReplicaIfLocal’s internal throw( 
kafka.common.NotLeaderForPartitionException) can not be catch by tryComplete(), 
so it throw up to until handle, I think it may be the cause of repeated error 
log and other strange thing.

> 在 2016年10月27日,上午7:31,Guozhang Wang  写道:
> 
> Json,
> 
> As you mentioned yourself the "NotLeaderForPartitionException" thrown
> from getLeaderReplicaIfLocal
> should be caught in the end, and hence I'm not sure why the reported stack
> trace "ERROR: ..." throwing the NotLeaderForPartitionException should be
> seen from "tryComplete". Also I have checked the source code in both
> 0.9.0.0 and 0.8.2.2, their line numbers does not match with the reported
> stack trace line numbers (e.g. DelayedFetch.scala:72), so I cannot really
> tell why you could ever see the error message instead of the
> DEBUG-level "Broker
> is no longer the leader of %s, satisfy %s immediately..".
> 
> Following the 0.9.0.0 source code, since it NotLeaderForPartitionException
> is caught and force the delayed fetch request to be sent with some
> potential error code, it will not cause the replica's fetch request to be
> not return successfully to the fetch broker, and hence should not leader
> producer / consumer to fail for a long time. Similarly, since we force
> completing those delayed fetch requests as well, it should not cause a spam
> of repeated error log entries since it should at most print one entry (and
> should be DEBUG not ERROR) for each delayed request whose partition leaders
> have migrated out.
> 
> 
> 
> Guozhang
> 
> 
> 
> On Wed, Oct 26, 2016 at 7:46 AM, Json Tu  wrote:
> 
>> it make the cluster can not provide normal service,which leades some
>> producer or fetch fail for a long time before I restart current broker.
>> this error may be come from some formerly fetch operation which contain
>> this partition,which leads many fetch response error.
>> 
>> The delayFetch's tryComplete() function implements as below,
>> override def tryComplete() : Boolean = {
>> var accumulatedSize = 0
>> fetchMetadata.fetchPartitionStatus.foreach {
>>   case (topicAndPartition, fetchStatus) =>
>> val fetchOffset = fetchStatus.startOffsetMetadata
>> try {
>>   if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
>> val replica = 
>> replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic,
>> topicAndPartition.partition)
>> /*ignore some codes*/
>>   }
>> } catch {
>>   /*ignore some code*/
>>   case nle: NotLeaderForPartitionException =>  // Case A
>> debug("Broker is no longer the leader of %s, satisfy %s
>> immediately".format(topicAndPartition, fetchMetadata))
>> return forceComplete()
>> }
>> }
>> /* ignore some codes */
>> }
>> 
>> when meet NotLeaderForPartitionException, it will invoke forceComplete()
>> function, then it will invoke onComplete() function, which implements as
>> below,
>> override def onComplete() {
>> val logReadResults = replicaManager.readFromLocalLog(
>> fetchMetadata.fetchOnlyLeader,
>>   fetchMetadata.fetchOnlyCommitted,
>>   fetchMetadata.fetchPartitionStatus.mapValues(status =>
>> status.fetchInfo))
>> 
>> val fetchPartitionData = logReadResults.mapValues(result =>
>>   FetchResponsePartitionData(result.errorCode, result.hw,
>> result.info.messageSet))
>> 
>> responseCallback(fetchPartitionData)
>> }
>> 
>> so, I think it exit the tryComplete function in advance because of this
>> partition, which makes the partition latter in this request may not be
>> completely be satisfied and return to the fetch broker,
>> which leads some producer and consumer fail for a longtime,I don’t know is
>> it correct
>> 
>>> 在 2016年10月25日,下午8:32,Json Tu  写道:
>>> 
>>> Hi all,
>>>  I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I
>> restart a broker,we find there are many logs as below,
>>> 
>>> [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling
>> request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId:
>> ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms;
>> MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] ->
>> PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] ->
>> PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25]
>> -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] ->
>> PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] ->
>> PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] ->
>> PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28]
>> -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0]
>> -> Partitio

Re: handleFetchRequest throw exception

2016-10-26 Thread Guozhang Wang
Json,

As you mentioned yourself the "NotLeaderForPartitionException" thrown
from getLeaderReplicaIfLocal
should be caught in the end, and hence I'm not sure why the reported stack
trace "ERROR: ..." throwing the NotLeaderForPartitionException should be
seen from "tryComplete". Also I have checked the source code in both
0.9.0.0 and 0.8.2.2, their line numbers does not match with the reported
stack trace line numbers (e.g. DelayedFetch.scala:72), so I cannot really
tell why you could ever see the error message instead of the
DEBUG-level "Broker
is no longer the leader of %s, satisfy %s immediately..".

Following the 0.9.0.0 source code, since it NotLeaderForPartitionException
is caught and force the delayed fetch request to be sent with some
potential error code, it will not cause the replica's fetch request to be
not return successfully to the fetch broker, and hence should not leader
producer / consumer to fail for a long time. Similarly, since we force
completing those delayed fetch requests as well, it should not cause a spam
of repeated error log entries since it should at most print one entry (and
should be DEBUG not ERROR) for each delayed request whose partition leaders
have migrated out.



Guozhang



On Wed, Oct 26, 2016 at 7:46 AM, Json Tu  wrote:

>  it make the cluster can not provide normal service,which leades some
> producer or fetch fail for a long time before I restart current broker.
>  this error may be come from some formerly fetch operation which contain
> this partition,which leads many fetch response error.
>
> The delayFetch's tryComplete() function implements as below,
>  override def tryComplete() : Boolean = {
>  var accumulatedSize = 0
>  fetchMetadata.fetchPartitionStatus.foreach {
>case (topicAndPartition, fetchStatus) =>
>  val fetchOffset = fetchStatus.startOffsetMetadata
>  try {
>if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
>  val replica = 
> replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic,
> topicAndPartition.partition)
>  /*ignore some codes*/
>}
>  } catch {
>/*ignore some code*/
>case nle: NotLeaderForPartitionException =>  // Case A
>  debug("Broker is no longer the leader of %s, satisfy %s
> immediately".format(topicAndPartition, fetchMetadata))
>  return forceComplete()
>  }
>  }
>  /* ignore some codes */
> }
>
> when meet NotLeaderForPartitionException, it will invoke forceComplete()
> function, then it will invoke onComplete() function, which implements as
> below,
> override def onComplete() {
>  val logReadResults = replicaManager.readFromLocalLog(
> fetchMetadata.fetchOnlyLeader,
>fetchMetadata.fetchOnlyCommitted,
>fetchMetadata.fetchPartitionStatus.mapValues(status =>
> status.fetchInfo))
>
>  val fetchPartitionData = logReadResults.mapValues(result =>
>FetchResponsePartitionData(result.errorCode, result.hw,
> result.info.messageSet))
>
>  responseCallback(fetchPartitionData)
> }
>
> so, I think it exit the tryComplete function in advance because of this
> partition, which makes the partition latter in this request may not be
> completely be satisfied and return to the fetch broker,
> which leads some producer and consumer fail for a longtime,I don’t know is
> it correct
>
> > 在 2016年10月25日,下午8:32,Json Tu  写道:
> >
> > Hi all,
> >   I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I
> restart a broker,we find there are many logs as below,
> >
> > [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling
> request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId:
> ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms;
> MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] ->
> PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] ->
> PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25]
> -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] ->
> PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] ->
> PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] ->
> PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28]
> -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0]
> -> PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] ->
> PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14]
> -> PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31]
> -> PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] ->
> PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] ->
> PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] ->
> PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] ->
> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] ->
> PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] ->
> PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation

Re: kafka streaming rocks db lock bug?

2016-10-26 Thread Guozhang Wang
Hello Ara,

I have looked through the logs you sent to me (thanks!), and I'm suspecting
the "file does not exist" issue is because your state directory is set to `
/tmp/kafka-streams` folder with the default value. As for the "LOCK: No
locks available" and "Failed to lock the state directory" issue, it seems
you are rolling bounce the stream instances (from the observed "shutdown"
log entries), and I'm suspecting it is because the state directories has
been auto deleted, it caused the thread tries to re-create it again while
still holding on the locks during the shutdown process.

Could you set the "state.dir" config to another directory and see if this
issue goes away?

http://docs.confluent.io/3.0.0/streams/developer-guide.html#optional-configuration-parameters


As for the detailed failover description, I'd recommend you to this doc
section:

http://docs.confluent.io/3.0.1/streams/architecture.html#fault-tolerance


As well as this blog post:

http://www.confluent.io/blog/elastic-scaling-in-kafka-streams/

Guozhang


On Tue, Oct 25, 2016 at 8:34 AM, Guozhang Wang  wrote:

> Logs would be very helpful, I can look further into it.
>
> Thanks Ara.
>
> On Mon, Oct 24, 2016 at 11:04 PM, Ara Ebrahimi <
> ara.ebrah...@argyledata.com> wrote:
>
>> This was in 10.1.0. What happened was that a kafka broker went down and
>> then this happened on the kafka streaming instance which had connection to
>> this broker. I can send you all logs I got.
>>
>> Ara.
>>
>> On Oct 24, 2016, at 10:41 PM, Guozhang Wang > wangg...@gmail.com>> wrote:
>>
>> Hello Ara,
>>
>> Your encountered issue seems to be KAFKA-3812
>> , and KAFKA-3938
>> . Could you try to
>> upgrade to the newly released 0.10.1.0 version and see if this issue goes
>> away? If not I would love to investigate this issue further with you.
>>
>>
>> Guozhang
>>
>>
>>
>> Guozhang
>>
>>
>> On Sun, Oct 23, 2016 at 1:45 PM, Ara Ebrahimi <
>> ara.ebrah...@argyledata.com>
>> wrote:
>>
>> And then this on a different node:
>>
>> 2016-10-23 13:43:57 INFO  StreamThread:286 - stream-thread
>> [StreamThread-3] Stream thread shutdown complete
>> 2016-10-23 13:43:57 ERROR StreamPipeline:169 - An exception has occurred
>> org.apache.kafka.streams.errors.StreamsException: stream-thread
>> [StreamThread-3] Failed to rebalance
>> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:401)
>> at org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:235)
>> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error
>> while creating the state manager
>> at org.apache.kafka.streams.processor.internals.AbstractTask.(
>> AbstractTask.java:72)
>> at org.apache.kafka.streams.processor.internals.
>> StreamTask.(StreamTask.java:90)
>> at org.apache.kafka.streams.processor.internals.
>> StreamThread.createStreamTask(StreamThread.java:622)
>> at org.apache.kafka.streams.processor.internals.
>> StreamThread.addStreamTasks(StreamThread.java:649)
>> at org.apache.kafka.streams.processor.internals.StreamThread.access$000(
>> StreamThread.java:69)
>> at org.apache.kafka.streams.processor.internals.StreamThread$1.
>> onPartitionsAssigned(StreamThread.java:120)
>> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
>> onJoinComplete(ConsumerCoordinator.java:228)
>> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> joinGroupIfNeeded(AbstractCoordinator.java:313)
>> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> ensureActiveGroup(AbstractCoordinator.java:277)
>> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
>> ConsumerCoordinator.java:259)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.
>> pollOnce(KafkaConsumer.java:1013)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> KafkaConsumer.java:979)
>> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:398)
>> ... 1 more
>> Caused by: java.io.IOException: task [7_1] Failed to lock the state
>> directory: /tmp/kafka-streams/argyle-streams/7_1
>> at org.apache.kafka.streams.processor.internals.
>> ProcessorStateManager.(ProcessorStateManager.java:98)
>> at org.apache.kafka.streams.processor.internals.AbstractTask.(
>> AbstractTask.java:69)
>> ... 13 more
>>
>> Ara.
>>
>> On Oct 23, 2016, at 1:24 PM, Ara Ebrahimi > ailto:ara.ebrah...@argyledata.com><
>> mailto:ara.ebrah...@argyledata.com>> wrote:
>>
>> Hi,
>>
>> This happens when I hammer our 5 kafka streaming nodes (each with 4
>> streaming threads) hard enough for an hour or so:
>>
>> 2016-10-23 13:04:17 ERROR StreamThread:324 - stream-thread
>> [StreamThread-2] Failed to flush state for StreamTask 3_8:
>> org.apache.kafka.streams.errors.ProcessorStateException: task [3_8]
>> Failed to flush state store streams-data-record-stats-avro-br-store
>> at o

Re: issue with custom processor flush to rocksdb store

2016-10-26 Thread saiprasad mishra
Yes this is similar meaning it was all about KafkaStreams not started
correctly in my spring app and NOT a bug in KafkaStreams.
Inside the comments in the JIRA I have mentioned what I was doing wrong.

These type of exceptions largely indicate kafka streams was not started
correctly

Thanks for your valuable time on this
Regards
Sai

On Wed, Oct 26, 2016 at 2:34 PM, Guozhang Wang  wrote:

> Is it a similar report as https://issues.apache.org/jira/browse/KAFKA-4344
> ?
>
> On Tue, Oct 25, 2016 at 2:43 PM, saiprasad mishra <
> saiprasadmis...@gmail.com
> > wrote:
>
> > Hi
> > This is with version 10.1.0 kafka streams (server running in remote and
> > streams app running local in my laptop).
> >
> >
> >
> > I have a kafka stream pipeline like this
> >
> > source topic(with 10 partitions) stream -> filter for null value ->map to
> > make it keyed by id ->custom processor to mystore(persistent)
> >
> > I am getting the below exception. This happens when the flush happens.
> > If I restart the app the data i sent is actually present in rocksdb
> store.
> > I see the message of the keyed stream went to partition 0 on which flush
> > happened correctly i guess as I see below partition 9 task failed to
> flush
> > not sure about the complain about timestamp() here.
> >
> > Can somebody explain what does this mean.
> >
> >
> > Not sure if it has something to do with below timestamp extractor
> property
> > i am setting or any other time like producer create time ???
> >
> > props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > ConsumerRecordTimestampExtractor.class);
> >
> >
> > Regards
> > Sai
> >
> >
> > 2016-10-25 14:31:29.822000
> > org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1
> > ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9
> state:
> >
> >
> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_9]
> Failed
> > to flush state store Products
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(
> > ProcessorStateManager.java:331)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.
> > java:275)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> > StreamThread.java:576)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> > StreamThread.java:562)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> > StreamThread.java:538)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:456)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:242)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > Caused by: java.lang.IllegalStateException: This should not happen as
> > timestamp() should only be called while a record is processed
> >
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.
> > timestamp(ProcessorContextImpl.java:192)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> > StoreChangeLogger.java:112)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> > flush(RocksDBStore.java:375)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(
> > MeteredKeyValueStore.java:175)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(
> > ProcessorStateManager.java:329)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > ... 6 more
> >
>
>
>
> --
> -- Guozhang
>


Re: issue with custom processor flush to rocksdb store

2016-10-26 Thread Guozhang Wang
Is it a similar report as https://issues.apache.org/jira/browse/KAFKA-4344?

On Tue, Oct 25, 2016 at 2:43 PM, saiprasad mishra  wrote:

> Hi
> This is with version 10.1.0 kafka streams (server running in remote and
> streams app running local in my laptop).
>
>
>
> I have a kafka stream pipeline like this
>
> source topic(with 10 partitions) stream -> filter for null value ->map to
> make it keyed by id ->custom processor to mystore(persistent)
>
> I am getting the below exception. This happens when the flush happens.
> If I restart the app the data i sent is actually present in rocksdb store.
> I see the message of the keyed stream went to partition 0 on which flush
> happened correctly i guess as I see below partition 9 task failed to flush
> not sure about the complain about timestamp() here.
>
> Can somebody explain what does this mean.
>
>
> Not sure if it has something to do with below timestamp extractor property
> i am setting or any other time like producer create time ???
>
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> ConsumerRecordTimestampExtractor.class);
>
>
> Regards
> Sai
>
>
> 2016-10-25 14:31:29.822000
> org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1
> ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9 state:
>
>
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_9] Failed
> to flush state store Products
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(
> ProcessorStateManager.java:331)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> java:275)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> StreamThread.java:576)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> StreamThread.java:562)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:538)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:456)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> Caused by: java.lang.IllegalStateException: This should not happen as
> timestamp() should only be called while a record is processed
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.
> timestamp(ProcessorContextImpl.java:192)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> StoreChangeLogger.java:112)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.
> flush(RocksDBStore.java:375)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(
> MeteredKeyValueStore.java:175)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(
> ProcessorStateManager.java:329)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> ... 6 more
>



-- 
-- Guozhang


Re: [VOTE] Add REST Server to Apache Kafka

2016-10-26 Thread Andrew Otto
-1 for http kafka client in core

Although a read only management interface, perhaps via http, sounds kinda
useful for things like health checks as mentioned.


On Wed, Oct 26, 2016 at 2:00 PM, Zakee  wrote:

> -1
>
> Thanks.
> > On Oct 25, 2016, at 2:16 PM, Harsha Chintalapani 
> wrote:
> >
> > Hi All,
> >   We are proposing to have a REST Server as part of  Apache Kafka
> > to provide producer/consumer/admin APIs. We Strongly believe having
> > REST server functionality with Apache Kafka will help a lot of users.
> > Here is the KIP that Mani Kumar wrote
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 80:+Kafka+Rest+Server.
> > There is a discussion thread in dev list that had differing opinions on
> > whether to include REST server in Apache Kafka or not. You can read more
> > about that in this thread
> > http://mail-archives.apache.org/mod_mbox/kafka-dev/201610.mbox/%3CCAMVt_
> aymqeudm39znsxgktpdde46sowmqhsxop-+jmbcuv7...@mail.gmail.com%3E
> >
> >  This is a VOTE thread to check interest in the community for
> > adding REST Server implementation in Apache Kafka.
> >
> > Thanks,
> > Harsha
>
> 
> How To Fix Your Fatigue (Do This Every Day)
> gundrymd.com
> http://thirdpartyoffers.netzero.net/TGL3231/5810ef50463346f4f12a0st02vuc


Re: [VOTE] Add REST Server to Apache Kafka

2016-10-26 Thread Zakee
-1

Thanks.
> On Oct 25, 2016, at 2:16 PM, Harsha Chintalapani  wrote:
> 
> Hi All,
>   We are proposing to have a REST Server as part of  Apache Kafka
> to provide producer/consumer/admin APIs. We Strongly believe having
> REST server functionality with Apache Kafka will help a lot of users.
> Here is the KIP that Mani Kumar wrote
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-80:+Kafka+Rest+Server.
> There is a discussion thread in dev list that had differing opinions on
> whether to include REST server in Apache Kafka or not. You can read more
> about that in this thread
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201610.mbox/%3ccamvt_aymqeudm39znsxgktpdde46sowmqhsxop-+jmbcuv7...@mail.gmail.com%3E
> 
>  This is a VOTE thread to check interest in the community for
> adding REST Server implementation in Apache Kafka.
> 
> Thanks,
> Harsha


How To Fix Your Fatigue (Do This Every Day)
gundrymd.com
http://thirdpartyoffers.netzero.net/TGL3231/5810ef50463346f4f12a0st02vuc

Re: [VOTE] Add REST Server to Apache Kafka

2016-10-26 Thread Guruditta Golani
+1

supporting this natively will help immensely in building operational tools on 
top of it and augment ease of use cases in large deployments.

-Guru

From: Shekar Tippur 
Sent: Wednesday, October 26, 2016 9:23 AM
To: users
Cc: d...@kafka.apache.org
Subject: Re: [VOTE] Add REST Server to Apache Kafka

+1

Thanks


Re: [VOTE] Add REST Server to Apache Kafka

2016-10-26 Thread Dana Powers
-1


On Wed, Oct 26, 2016 at 9:23 AM, Shekar Tippur  wrote:
> +1
>
> Thanks


Re: [VOTE] Add REST Server to Apache Kafka

2016-10-26 Thread Shekar Tippur
+1

Thanks


Re: [VOTE] Add REST Server to Apache Kafka

2016-10-26 Thread Ben Davison
I'm going to change my vote to a -1, great points made by all.

(Although I would want to have another discussion around a "management"
REST endpoint for Kafka, I think there would be value in that)

On Wed, Oct 26, 2016 at 3:56 PM, Samuel Taylor 
wrote:

> -1
>
> I don't think the reasons from the KIP are good enough:
>
> 1 -- That other tools have a REST interface isn't a reason Kafka needs one.
> Further, "it is useful" is not a reason; *why* is it useful?
> 2 -- Why does a REST server *need* to be immediately available to every use
> that downloads Kafka?
> 3 -- Maintaining compatibility between Kafka and a REST proxy see seems
> like a burden that should be on the maintainers of the REST proxy, not on
> the core team.
>
> On Wed, Oct 26, 2016 at 2:48 AM, Jan Filipiak 
> wrote:
>
> > And you also still need to find the correct broker, for each http call,
> > wich is also hard, when programming against the http api
> >
> >
> > On 26.10.2016 09:46, Jan Filipiak wrote:
> >
> >> So happy to see this reply.
> >> I do think the same, actually makes it way harder to properly batch up
> >> records on http, as kafka core would need to know how to split your
> payload.
> >> It would help people do the wrong thing IMO
> >>
> >> best Jan
> >>
> >> On 25.10.2016 23:58, Jay Kreps wrote:
> >>
> >>> -1
> >>>
> >>> I think the REST server for Kafka that already exists is quite good and
> >>> getting contributions. Moving this into the core project doesn't solve
> a
> >>> problem that I see.
> >>>
> >>> -Jay
> >>>
> >>> On Tue, Oct 25, 2016 at 2:16 PM, Harsha Chintalapani 
> >>> wrote:
> >>>
> >>> Hi All,
>  We are proposing to have a REST Server as part of Apache
>  Kafka
>  to provide producer/consumer/admin APIs. We Strongly believe having
>  REST server functionality with Apache Kafka will help a lot of users.
>  Here is the KIP that Mani Kumar wrote
>  https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>  80:+Kafka+Rest+Server.
>  There is a discussion thread in dev list that had differing opinions
> on
>  whether to include REST server in Apache Kafka or not. You can read
> more
>  about that in this thread
>  http://mail-archives.apache.org/mod_mbox/kafka-dev/201610.mb
>  ox/%3CCAMVt_
>  aymqeudm39znsxgktpdde46sowmqhsxop-+jmbcuv7...@mail.gmail.com%3E
> 
> This is a VOTE thread to check interest in the community
> for
>  adding REST Server implementation in Apache Kafka.
> 
>  Thanks,
>  Harsha
> 
> 
> >>
> >
>
>
> --
> *Samuel Taylor*
> Data Science
>
> *Square Root, Inc. *
> Square-Root.com 
>

-- 


This email, including attachments, is private and confidential. If you have 
received this email in error please notify the sender and delete it from 
your system. Emails are not secure and may contain viruses. No liability 
can be accepted for viruses that might be transferred by this email or any 
attachment. Any unauthorised copying of this message or unauthorised 
distribution and publication of the information contained herein are 
prohibited.

7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
Registered in England and Wales. Registered No. 04843573.


Re: [VOTE] Add REST Server to Apache Kafka

2016-10-26 Thread Samuel Taylor
-1

I don't think the reasons from the KIP are good enough:

1 -- That other tools have a REST interface isn't a reason Kafka needs one.
Further, "it is useful" is not a reason; *why* is it useful?
2 -- Why does a REST server *need* to be immediately available to every use
that downloads Kafka?
3 -- Maintaining compatibility between Kafka and a REST proxy see seems
like a burden that should be on the maintainers of the REST proxy, not on
the core team.

On Wed, Oct 26, 2016 at 2:48 AM, Jan Filipiak 
wrote:

> And you also still need to find the correct broker, for each http call,
> wich is also hard, when programming against the http api
>
>
> On 26.10.2016 09:46, Jan Filipiak wrote:
>
>> So happy to see this reply.
>> I do think the same, actually makes it way harder to properly batch up
>> records on http, as kafka core would need to know how to split your payload.
>> It would help people do the wrong thing IMO
>>
>> best Jan
>>
>> On 25.10.2016 23:58, Jay Kreps wrote:
>>
>>> -1
>>>
>>> I think the REST server for Kafka that already exists is quite good and
>>> getting contributions. Moving this into the core project doesn't solve a
>>> problem that I see.
>>>
>>> -Jay
>>>
>>> On Tue, Oct 25, 2016 at 2:16 PM, Harsha Chintalapani 
>>> wrote:
>>>
>>> Hi All,
 We are proposing to have a REST Server as part of Apache
 Kafka
 to provide producer/consumer/admin APIs. We Strongly believe having
 REST server functionality with Apache Kafka will help a lot of users.
 Here is the KIP that Mani Kumar wrote
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-
 80:+Kafka+Rest+Server.
 There is a discussion thread in dev list that had differing opinions on
 whether to include REST server in Apache Kafka or not. You can read more
 about that in this thread
 http://mail-archives.apache.org/mod_mbox/kafka-dev/201610.mb
 ox/%3CCAMVt_
 aymqeudm39znsxgktpdde46sowmqhsxop-+jmbcuv7...@mail.gmail.com%3E

This is a VOTE thread to check interest in the community for
 adding REST Server implementation in Apache Kafka.

 Thanks,
 Harsha


>>
>


-- 
*Samuel Taylor*
Data Science

*Square Root, Inc. *
Square-Root.com 


Re: handleFetchRequest throw exception

2016-10-26 Thread Json Tu
 it make the cluster can not provide normal service,which leades some producer 
or fetch fail for a long time before I restart current broker.
 this error may be come from some formerly fetch operation which contain this 
partition,which leads many fetch response error.

The delayFetch's tryComplete() function implements as below,
 override def tryComplete() : Boolean = {
 var accumulatedSize = 0
 fetchMetadata.fetchPartitionStatus.foreach {
   case (topicAndPartition, fetchStatus) =>
 val fetchOffset = fetchStatus.startOffsetMetadata
 try {
   if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
 val replica = 
replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, 
topicAndPartition.partition)
 /*ignore some codes*/
   }
 } catch {
   /*ignore some code*/
   case nle: NotLeaderForPartitionException =>  // Case A
 debug("Broker is no longer the leader of %s, satisfy %s 
immediately".format(topicAndPartition, fetchMetadata))
 return forceComplete()
 }
 }
 /* ignore some codes */
}

when meet NotLeaderForPartitionException, it will invoke forceComplete() 
function, then it will invoke onComplete() function, which implements as below,
override def onComplete() {
 val logReadResults = 
replicaManager.readFromLocalLog(fetchMetadata.fetchOnlyLeader,
   fetchMetadata.fetchOnlyCommitted,
   fetchMetadata.fetchPartitionStatus.mapValues(status => status.fetchInfo))

 val fetchPartitionData = logReadResults.mapValues(result =>
   FetchResponsePartitionData(result.errorCode, result.hw, 
result.info.messageSet))

 responseCallback(fetchPartitionData)
}

so, I think it exit the tryComplete function in advance because of this 
partition, which makes the partition latter in this request may not be 
completely be satisfied and return to the fetch broker,
which leads some producer and consumer fail for a longtime,I don’t know is it 
correct

> 在 2016年10月25日,下午8:32,Json Tu  写道:
> 
> Hi all,
>   I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I restart 
> a broker,we find there are many logs as below,
> 
> [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling 
> request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId: 
> ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms; 
> MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] -> 
> PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] -> 
> PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25]
>  -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] -> 
> PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] -> 
> PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] -> 
> PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28] 
> -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0] -> 
> PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] -> 
> PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14] -> 
> PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31] 
> -> PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] -> 
> PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] -> 
> PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] -> 
> PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] -> 
> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] -> 
> PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] -> 
> PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation,45] -> 
> PartitionFetchInfo(53975,1048576),[waimai_c_ordertag_orderdealremark,1] -> 
> PartitionFetchInfo(1829848,1048576),[retail.d.ris.spider.jddj.request,0] -> 
> PartitionFetchInfo(5116337,1048576),[waimai_c_ucenter_asyncrelationbind_delay,13]
>  -> 
> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_asyncrelationbind_delay,55] 
> -> PartitionFetchInfo(0,1048576),[waimai_push_e_operate_prod,3] -> 
> PartitionFetchInfo(442564,1048576),[waimai_ordersa_topic_user_order_in_poi_count_diff,5]
>  -> PartitionFetchInfo(23791010,1048576),[retail.c.order.create,4] -> 
> PartitionFetchInfo(72902,1048576),[waimai_c_ucenter_asyncrelationbind_staging,2]
>  -> PartitionFetchInfo(66,1048576),[waimai_c_order_orderevent_topic,35] -> 
> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_syncuserrelation,43] -> 
> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,48] -> 
> PartitionFetchInfo(49496018,1048576),[waimai_c_monitor_orderstatus,32] -> 
> PartitionFetchInfo(50623699,1048576),[waimai_c_ucenter_loadrelation,15] -> 
> PartitionFetchInfo(54360,1048576),[waimai_c_monitor_orderstatus,2] -> 
> PartitionFetchInfo(50624881,1048576),[waimai_c_order_databus_wmorder,24] -> 
> PartitionFetchInfo(49548334,1048576),[waimai_c_order_databus_wmorder,18] -> 
> PartitionFetchInfo(49489397,1048576),[waimai_c_ucenter_asyncrelationbind,36] 
> -> PartitionF

In-Reply-To=%3CCALsq4vKokx2_haKCwXHdoHrsVwn8UoBETCco6o1dwyFDoed1aw%40mail.gmail.com%3E&Subject=TimeoutException%3A%20failed%20to%20update%20metadata%20after

2016-10-26 Thread Daniel Wikström
The KafkaProducer have a method partitionsFor() that fetches the metadata. So 
something like this can be used to force load the metadata with max.block.ms=0


while(!gotMetadata) {
try {
producer.partitionsFor(topic);
gotMetadata=true;

} catch (TimeoutException e) {
logger.info(String.format("KafkaProducer for topic %s is waiting for 
metadata...", topic));
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {

}
}
}



Re: Kafka multitenancy

2016-10-26 Thread Eno Thereska
Hi Mudit,

This is a relevant read: 
http://www.confluent.io/blog/sharing-is-caring-multi-tenancy-in-distributed-data-systems
 

 It talks about two aspects of multi tenancy in Kafka, security and performance 
isolation through quotas.

Eno

> On 26 Oct 2016, at 04:22, Mudit Agarwal  wrote:
> 
> Hi,
> How we can achieve multi-tenancy in kafka efficiently?
> Thanks,Mudit



Kafka multitenancy

2016-10-26 Thread Mudit Agarwal
Hi,
How we can achieve multi-tenancy in kafka efficiently?
Thanks,Mudit

Re: [VOTE] Add REST Server to Apache Kafka

2016-10-26 Thread Jan Filipiak
And you also still need to find the correct broker, for each http call, 
wich is also hard, when programming against the http api


On 26.10.2016 09:46, Jan Filipiak wrote:

So happy to see this reply.
I do think the same, actually makes it way harder to properly batch up 
records on http, as kafka core would need to know how to split your 
payload.

It would help people do the wrong thing IMO

best Jan

On 25.10.2016 23:58, Jay Kreps wrote:

-1

I think the REST server for Kafka that already exists is quite good and
getting contributions. Moving this into the core project doesn't solve a
problem that I see.

-Jay

On Tue, Oct 25, 2016 at 2:16 PM, Harsha Chintalapani 
wrote:


Hi All,
We are proposing to have a REST Server as part of Apache 
Kafka

to provide producer/consumer/admin APIs. We Strongly believe having
REST server functionality with Apache Kafka will help a lot of users.
Here is the KIP that Mani Kumar wrote
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
80:+Kafka+Rest+Server.
There is a discussion thread in dev list that had differing opinions on
whether to include REST server in Apache Kafka or not. You can read 
more

about that in this thread
http://mail-archives.apache.org/mod_mbox/kafka-dev/201610.mbox/%3CCAMVt_ 


aymqeudm39znsxgktpdde46sowmqhsxop-+jmbcuv7...@mail.gmail.com%3E

   This is a VOTE thread to check interest in the community for
adding REST Server implementation in Apache Kafka.

Thanks,
Harsha







Re: [VOTE] Add REST Server to Apache Kafka

2016-10-26 Thread Jan Filipiak

So happy to see this reply.
I do think the same, actually makes it way harder to properly batch up 
records on http, as kafka core would need to know how to split your payload.

It would help people do the wrong thing IMO

best Jan

On 25.10.2016 23:58, Jay Kreps wrote:

-1

I think the REST server for Kafka that already exists is quite good and
getting contributions. Moving this into the core project doesn't solve a
problem that I see.

-Jay

On Tue, Oct 25, 2016 at 2:16 PM, Harsha Chintalapani 
wrote:


Hi All,
We are proposing to have a REST Server as part of  Apache Kafka
to provide producer/consumer/admin APIs. We Strongly believe having
REST server functionality with Apache Kafka will help a lot of users.
Here is the KIP that Mani Kumar wrote
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
80:+Kafka+Rest+Server.
There is a discussion thread in dev list that had differing opinions on
whether to include REST server in Apache Kafka or not. You can read more
about that in this thread
http://mail-archives.apache.org/mod_mbox/kafka-dev/201610.mbox/%3CCAMVt_
aymqeudm39znsxgktpdde46sowmqhsxop-+jmbcuv7...@mail.gmail.com%3E

   This is a VOTE thread to check interest in the community for
adding REST Server implementation in Apache Kafka.

Thanks,
Harsha