Re: Python kafka client benchmarks

2016-06-15 Thread Dana Powers
Very nice!

On Wed, Jun 15, 2016 at 6:40 PM, John Dennison  wrote:
> My team has published a post comparing python kafka clients. Might be of
> interest to python users.
>
> http://activisiongamescience.github.io/2016/06/15/Kafka-Client-Benchmarking/


Python kafka client benchmarks

2016-06-15 Thread John Dennison
My team has published a post comparing python kafka clients. Might be of
interest to python users.

http://activisiongamescience.github.io/2016/06/15/Kafka-Client-Benchmarking/


Re: Embedding zookeeper and kafka in java process.

2016-06-15 Thread Flavio Junqueira

> On 15 Jun 2016, at 21:56, Subhash Agrawal  wrote:
> 
> [2016-06-15 13:39:39,808] DEBUG [ZkClient-EventThread-24-localhost:2181] 
> [Channel manager on controller 0]: Controller 0 trying to connect to broker 0 
> (kafka.controller.ControllerChannelManager)


The controller isn't being able to connect to itself (as broker)? It looks like 
ZK is triggering the event just fine, but the controller is having some trouble 
seeing itself as a broker.

-Flavio

Re: Configuring client-side timeouts in the Java Producer so that send() doesn't block

2016-06-15 Thread R Krishna
Any luck trying to figure out this problem?

On Wed, May 18, 2016 at 10:53 AM, Samuel Chase  wrote:

> Hello Ismael,
>
> On Wed, May 18, 2016 at 5:54 PM, Ismael Juma  wrote:
> > Your second example should work as well. Can you please include the code
> > you are using to test the scenario and what Kafka version you are using
> > (0.9.0.0 or 0.9.0.1, I guess)?
>
> This is the code I'm using:
>
> ```
> (with-open [p (producer {"bootstrap.servers" "192.168.33.20:9091"
>  "max.block.ms" "500"
>  "request.timeout.ms" "1000"
>  "metadata.fetch.timeout.ms" "1"}
> (byte-array-serializer)
> (byte-array-serializer))]
>   (.get (send p (record "test" (.getBytes "Kafka Up."
>   (Thread/sleep 1) ; While it sleeps, I shut down Kafka.
>   (.get (send p (record "test" (.getBytes "Kafka Down")
>
> ```
>
> Kafka version: 0.9.0.0
> clj-kafka version: 0.3.4
>
> Samuel
>



-- 
Radha Krishna, Proddaturi
253-234-5657


Embedding zookeeper and kafka in java process.

2016-06-15 Thread Subhash Agrawal
Hi All,
I am embedding Kafka 0.10.0 and corresponding zookeeper in java process. In 
this process, I start zookeeper first and then wait for 10 seconds and
then start kafka. These are all running in the same process. Toward the end of 
kafka startup, I see following exception. It seems zookeeper is not able
to add the newly created kafka instance. Have you seen this error earlier?  I 
have only single node kafka.

Let me know if you have any suggestions. I will really appreciate any help on 
this.

Thanks
Subhash Agrawal.

[2016-06-15 13:39:39,616] INFO [Logserver_Starter] Registered broker 0 at path 
/brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(localhost,8392,PLAINTEXT) 
(kafka.utils.ZkUtils)
[2016-06-15 13:39:39,617] WARN [Logserver_Starter] No meta.properties file 
under dir C:\development \newkafka-logs\meta.properties 
(kafka.server.BrokerMetadataCheckpoint)
[2016-06-15 13:39:39,627] INFO [ZkClient-EventThread-24-localhost:2181] New 
leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-06-15 13:39:39,629] INFO [ZkClient-EventThread-24-localhost:2181] 
[BrokerChangeListener on Controller 0]: Broker change listener fired for path 
/brokers/ids with children 0 
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)
[2016-06-15 13:39:39,638] INFO [Logserver_Starter] Kafka version : 0.10.0.0 
(org.apache.kafka.common.utils.AppInfoParser)
[2016-06-15 13:39:39,638] INFO [Logserver_Starter] Kafka commitId : 
b8642491e78c5a13 (org.apache.kafka.common.utils.AppInfoParser)
[2016-06-15 13:39:39,639] INFO [Logserver_Starter] [Kafka Server 0], started 
(kafka.server.KafkaServer)
[2016-06-15 13:39:39,806] INFO [ZkClient-EventThread-24-localhost:2181] 
[BrokerChangeListener on Controller 0]: Newly added brokers: 0, deleted 
brokers: , all live brokers: 0 
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)
[2016-06-15 13:39:39,808] DEBUG [ZkClient-EventThread-24-localhost:2181] 
[Channel manager on controller 0]: Controller 0 trying to connect to broker 0 
(kafka.controller.ControllerChannelManager)
[2016-06-15 13:39:39,818] ERROR [ZkClient-EventThread-24-localhost:2181] 
[BrokerChangeListener on Controller 0]: Error while handling broker changes 
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)
scala.MatchError: null
at 
kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:122)
at 
kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:74)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$4.apply(ReplicaStateMachine.scala:372)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$4.apply(ReplicaStateMachine.scala:372)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:372)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)



Re: 10MB message

2016-06-15 Thread James Cheng
Igor,

This article talks about what to think about if putting large messages into 
Kafka: http://ingest.tips/2015/01/21/handling-large-messages-kafka/

The summary is that Kafka is not optimized for handling large messages, but if 
you really want to, it's possible to do it.

That website is having issues right now, so it may take a (long) while to load. 
I've notified the website owner to let her know.

-James

> On Jun 14, 2016, at 1:33 AM, R Krishna  wrote:
> 
> There are options to compress on the wire and in the topic.
> 
> On Tue, May 31, 2016 at 8:35 AM, Igor Kravzov 
> wrote:
> 
>> In our system some data can be as big as 10MB.
>> Is it OK to send 10 MB message through Kafka?  What configuration
>> parameters should I check/set?
>> It is going to be one topic with one consumer - Apache NiFi GetKafka
>> processor.
>> Is one partition enough?
>> 
> 
> 
> 
> -- 
> Radha Krishna, Proddaturi
> 253-234-5657



Re: Kafka broker slow down when consumer try to fetch large messages from topic

2016-06-15 Thread R Krishna
Prateek, hope you looked at compression?

On Thu, Jun 2, 2016 at 10:26 AM, Tom Crayford  wrote:

> The article says ideal is about 10KB, which holds up well with what we've
> seen in practice as well.
>
> On Thu, Jun 2, 2016 at 6:25 PM, prateek arora 
> wrote:
>
> > Hi
> > Thanks for the information .
> >
> > I have one question :
> >
> > Right now in my scenario  maximum message size is around 800KB . did we
> > consider these messages in large size categories , because article told
> > about 10-100 MB data .
> >
> > Regards
> > Prateek
> >
> >
> >
> >
> >
> >
> > On Thu, Jun 2, 2016 at 6:54 AM, Tom Crayford 
> wrote:
> >
> > > Hi there,
> > >
> > > Firstly, a note that Kafka isn't really designed for this kind of large
> > > message. http://ingest.tips/2015/01/21/handling-large-messages-kafka/
> > > covers a lot of tips around this use case however, and covers some
> tuning
> > > that will likely improve your usage.
> > >
> > > In particular, I expect tuning up fetch.message.max.bytes on the
> consumer
> > > to help out a lot here.
> > >
> > > Generally though, doing large messages will lead to very low throughput
> > and
> > > lots of stability issues, as noted in that article. We run thousands of
> > > clusters in production, and typically recommend folk keep message sizes
> > > down to the few tens of KB for most use cases.
> > >
> > > Thanks
> > >
> > > Tom Crayford
> > > Heroku Kafka
> > >
> > > On Wed, Jun 1, 2016 at 9:49 PM, prateek arora <
> > prateek.arora...@gmail.com>
> > > wrote:
> > >
> > > > I have 4 node kafka broker with following configuration :
> > > >
> > > > Default Number of Partitions  : num.partitions : 1
> > > > Default Replication Factor : default.replication.factor : 1
> > > > Maximum Message Size : message.max.bytes : 10 MB
> > > > Replica Maximum Fetch Size : replica.fetch.max.bytes : 10 MB
> > > >
> > > >
> > > > Right now I have 4 topic with 1 partition and 1 replication factor .
> > > >
> > > > "Topic Name" : "Broker Id" :  "Total Messages Received Across Kafka
> > > > Broker" : "Total Bytes Received Across Kafka Broker"
> > > > Topic 1  - Leader Kafka Broker 1 :  4.67 Message/Second  :  1.6
> > MB/second
> > > > Topic 2  - Leader Kafka Broker 2 :  4.78 Message/Second  :  4.1
> > MB/second
> > > > Topic 3  - Leader Kafka Broker 1 :  4.83  Message/Second   : 1.6
> > > MB/second
> > > > Topic 4  - Leader Kafka Broker 3  : 4.8 Message/Second   :   4.3
> > > MB/second
> > > >
> > > > Message consist of .
> > > >
> > > >
> > > > when consumer tried to read message from "Topic 2"  Kafka Broker rate
> > of
> > > >  message receiving slow down from 4.77 message/second to 3.12
> > > > message/second  , after some time  try to goes up .
> > > >
> > > > I also attached screenshot of "Total Messages Received Across Kafka
> > > > Broker"  and "Total Bytes Received Across Kafka Broker" for topic
> > "Topic
> > > > 2" .
> > > >
> > > > can someone explain why it is happen and how to solve it ?
> > > >
> > > > Regards
> > > > Prateek
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> >
>



-- 
Radha Krishna, Proddaturi
253-234-5657


Re: ConsoleProducer missing messages (random behavior)

2016-06-15 Thread Vahid S Hashemian
I believe this issue is similar to the one reported here: 
https://issues.apache.org/jira/browse/KAFKA-3129.

--Vahid
 



From:   Dean Arnold 
To: users@kafka.apache.org
Date:   06/15/2016 11:27 AM
Subject:Re: ConsoleProducer missing messages (random behavior)



btw, it appears the missing msgs are at the end of the CSV file, so maybe
the producer doesn't properly flush when it gets EOF on stdin ?

On Wed, Jun 15, 2016 at 11:21 AM, Dean Arnold  
wrote:

> I'm seeing similar issues with 0.9.0.1.
>
> I'm feeding CSV records (65536 total, 1 record per msg) to the console
> producer, which are consumed via a sink connector (using 
connect-standalone
> and a single partition). The sink occasionally reports flushing less 
than
> 65536 msgs via the sink flush(). Restarting the sink connector with a
> forced reset to offset 0 (ie, replaying all the msgs on the topic) shows
> that the messages are still missing (ie, no gaps in offsets), so I 
assume
> the msgs must be lost by the producer ?
>
>
> On Wed, Jun 15, 2016 at 1:29 AM, Radu Radutiu  
wrote:
>
>> Hi,
>>
>> I was following the Quickstart guide and I have noticed that
>> ConsoleProducer does not publish all messages (the number of messages
>> published differs from one run to another) and happens mostly on a 
fresh
>> started broker.
>> version: kafka_2.11-0.10.0.0
>> OS: Linux (Ubuntu 14.04, Centos 7.2)
>> JDK: java version "1.7.0_101"
>> OpenJDK Runtime Environment (IcedTea 2.6.6)
>> (7u101-2.6.6-0ubuntu0.14.04.1),
>> openjdk version "1.8.0_91"
>> OpenJDK Runtime Environment (build 1.8.0_91-b14)
>>
>>
>> How to reproduce:
>> - start zookeeper:
>> ~/work/kafka_2.11-0.10.0.0$ bin/zookeeper-server-start.sh
>> config/zookeeper.properties &
>>
>> -start kafka:
>> ~/work/kafka_2.11-0.10.0.0$ bin/kafka-server-start.sh
>> config/server.properties &
>>
>> -start console consumer (topic test1 is already created):
>> ~/work/kafka_2.11-0.10.0.0$ bin/kafka-console-consumer.sh
>> --bootstrap-server localhost:9092 -topic test1 --zookeeper 
localhost:2181
>>
>> -in another terminal start console producer with the LICENSE file in 
kafka
>> directory as input:
>> ~/work/kafka_2.11-0.10.0.0$ bin/kafka-console-producer.sh --topic test1
>> --broker-list localhost:9092   >
>> The last line in the console consumer output is not the last line in 
the
>> LICENSE file for the first few runs of the console producer. If I use 
the
>> --old-producer parameter, all the lines in the LICENSE file are 
published
>> (and appear in the console consumer output). Different runs of console
>> producer with the same input file publish different number of lines
>> (sometimes all, sometimes only 182 lines out of 330). I've noticed that 
if
>> the kafka server was started a long time ago the console producer
>> publishes
>> all lines.
>> I have checked the kafka binary log file (in my case
>> /tmp/kafka-logs/test1-0/.log ) and confirmed that 
the
>> messages are not published (the console consumer receives all the
>> messages).
>>
>> Is there an explanation for this behavior?
>>
>> Best regards,
>> Radu
>>
>
>






Re: About null keys

2016-06-15 Thread Eno Thereska
Hi Adrienne,

How do you enter the input on t1 topic? If using kafka-console-producer, you'll 
need to pass in keys as well as values. Here is an example: 
http://www.shayne.me/blog/2015/2015-06-25-everything-about-kafka-part-2/ 


Thanks
Eno

> On 15 Jun 2016, at 17:21, Adrienne Kole  wrote:
> 
> Hi community,
> 
> Probably it is very basic question, as I am new to Kafka Streams.
> I am trying to initialize KTable or KStream from  kafka topic. However, I
> don't know how to avoid getting null keys. So,
> 
>  KTable source =
> builder.stream(Serdes.String(),Serdes.String(),"t1");
>  source.print();
> 
> I enter the input on t1 topic as:
> 
> one
> two
> one one
> 
> and get as a result:
> 
> null, (one <-null)
> null, (two <-null)
> null, (one one <-null)
> 
> 
> So, the key is always null.
> 
> Is the key supposed to initialize after getting input from topic? If yes,
> how can I initialize key in KTable after getting input? For example in
> KStream we have KeyValueMapper that can be used to assign new keys and
> values. However, in KTable we have only ValueMappers.
> 
> If not, how can I initialize the key of KTable from the input  from given
> topic?
> 
> 
> Cheers
> Adrienne



Re: ConsoleProducer missing messages (random behavior)

2016-06-15 Thread Dean Arnold
btw, it appears the missing msgs are at the end of the CSV file, so maybe
the producer doesn't properly flush when it gets EOF on stdin ?

On Wed, Jun 15, 2016 at 11:21 AM, Dean Arnold  wrote:

> I'm seeing similar issues with 0.9.0.1.
>
> I'm feeding CSV records (65536 total, 1 record per msg) to the console
> producer, which are consumed via a sink connector (using connect-standalone
> and a single partition). The sink occasionally reports flushing less than
> 65536 msgs via the sink flush(). Restarting the sink connector with a
> forced reset to offset 0 (ie, replaying all the msgs on the topic) shows
> that the messages are still missing (ie, no gaps in offsets), so I assume
> the msgs must be lost by the producer ?
>
>
> On Wed, Jun 15, 2016 at 1:29 AM, Radu Radutiu  wrote:
>
>> Hi,
>>
>> I was following the Quickstart guide and I have noticed that
>> ConsoleProducer does not publish all messages (the number of messages
>> published differs from one run to another) and happens mostly on a fresh
>> started broker.
>> version: kafka_2.11-0.10.0.0
>> OS: Linux (Ubuntu 14.04, Centos 7.2)
>> JDK: java version "1.7.0_101"
>> OpenJDK Runtime Environment (IcedTea 2.6.6)
>> (7u101-2.6.6-0ubuntu0.14.04.1),
>> openjdk version "1.8.0_91"
>> OpenJDK Runtime Environment (build 1.8.0_91-b14)
>>
>>
>> How to reproduce:
>> - start zookeeper:
>> ~/work/kafka_2.11-0.10.0.0$ bin/zookeeper-server-start.sh
>> config/zookeeper.properties &
>>
>> -start kafka:
>> ~/work/kafka_2.11-0.10.0.0$ bin/kafka-server-start.sh
>> config/server.properties &
>>
>> -start console consumer (topic test1 is already created):
>> ~/work/kafka_2.11-0.10.0.0$ bin/kafka-console-consumer.sh
>> --bootstrap-server localhost:9092 -topic test1 --zookeeper localhost:2181
>>
>> -in another terminal start console producer with the LICENSE file in kafka
>> directory as input:
>> ~/work/kafka_2.11-0.10.0.0$ bin/kafka-console-producer.sh --topic test1
>> --broker-list localhost:9092   >
>> The last line in the console consumer output is not the last line in the
>> LICENSE file for the first few runs of the console producer. If I use the
>> --old-producer parameter, all the lines in the LICENSE file are published
>> (and appear in the console consumer output). Different runs of console
>> producer with the same input file publish different number of lines
>> (sometimes all, sometimes only 182 lines out of 330). I've noticed that if
>> the kafka server was started a long time ago the console producer
>> publishes
>> all lines.
>> I have checked the kafka binary log file (in my case
>> /tmp/kafka-logs/test1-0/.log ) and confirmed that the
>> messages are not published (the console consumer receives all the
>> messages).
>>
>> Is there an explanation for this behavior?
>>
>> Best regards,
>> Radu
>>
>
>


Re: ConsoleProducer missing messages (random behavior)

2016-06-15 Thread Dean Arnold
I'm seeing similar issues with 0.9.0.1.

I'm feeding CSV records (65536 total, 1 record per msg) to the console
producer, which are consumed via a sink connector (using connect-standalone
and a single partition). The sink occasionally reports flushing less than
65536 msgs via the sink flush(). Restarting the sink connector with a
forced reset to offset 0 (ie, replaying all the msgs on the topic) shows
that the messages are still missing (ie, no gaps in offsets), so I assume
the msgs must be lost by the producer ?


On Wed, Jun 15, 2016 at 1:29 AM, Radu Radutiu  wrote:

> Hi,
>
> I was following the Quickstart guide and I have noticed that
> ConsoleProducer does not publish all messages (the number of messages
> published differs from one run to another) and happens mostly on a fresh
> started broker.
> version: kafka_2.11-0.10.0.0
> OS: Linux (Ubuntu 14.04, Centos 7.2)
> JDK: java version "1.7.0_101"
> OpenJDK Runtime Environment (IcedTea 2.6.6) (7u101-2.6.6-0ubuntu0.14.04.1),
> openjdk version "1.8.0_91"
> OpenJDK Runtime Environment (build 1.8.0_91-b14)
>
>
> How to reproduce:
> - start zookeeper:
> ~/work/kafka_2.11-0.10.0.0$ bin/zookeeper-server-start.sh
> config/zookeeper.properties &
>
> -start kafka:
> ~/work/kafka_2.11-0.10.0.0$ bin/kafka-server-start.sh
> config/server.properties &
>
> -start console consumer (topic test1 is already created):
> ~/work/kafka_2.11-0.10.0.0$ bin/kafka-console-consumer.sh
> --bootstrap-server localhost:9092 -topic test1 --zookeeper localhost:2181
>
> -in another terminal start console producer with the LICENSE file in kafka
> directory as input:
> ~/work/kafka_2.11-0.10.0.0$ bin/kafka-console-producer.sh --topic test1
> --broker-list localhost:9092   
> The last line in the console consumer output is not the last line in the
> LICENSE file for the first few runs of the console producer. If I use the
> --old-producer parameter, all the lines in the LICENSE file are published
> (and appear in the console consumer output). Different runs of console
> producer with the same input file publish different number of lines
> (sometimes all, sometimes only 182 lines out of 330). I've noticed that if
> the kafka server was started a long time ago the console producer publishes
> all lines.
> I have checked the kafka binary log file (in my case
> /tmp/kafka-logs/test1-0/.log ) and confirmed that the
> messages are not published (the console consumer receives all the
> messages).
>
> Is there an explanation for this behavior?
>
> Best regards,
> Radu
>


ConsoleProducer missing messages (random behavior)

2016-06-15 Thread Radu Radutiu
Hi,

I was following the Quickstart guide and I have noticed that
ConsoleProducer does not publish all messages (the number of messages
published differs from one run to another) and happens mostly on a fresh
started broker.
version: kafka_2.11-0.10.0.0
OS: Linux (Ubuntu 14.04, Centos 7.2)
JDK: java version "1.7.0_101"
OpenJDK Runtime Environment (IcedTea 2.6.6) (7u101-2.6.6-0ubuntu0.14.04.1),
openjdk version "1.8.0_91"
OpenJDK Runtime Environment (build 1.8.0_91-b14)


How to reproduce:
- start zookeeper:
~/work/kafka_2.11-0.10.0.0$ bin/zookeeper-server-start.sh
config/zookeeper.properties &

-start kafka:
~/work/kafka_2.11-0.10.0.0$ bin/kafka-server-start.sh
config/server.properties &

-start console consumer (topic test1 is already created):
~/work/kafka_2.11-0.10.0.0$ bin/kafka-console-consumer.sh
--bootstrap-server localhost:9092 -topic test1 --zookeeper localhost:2181

-in another terminal start console producer with the LICENSE file in kafka
directory as input:
~/work/kafka_2.11-0.10.0.0$ bin/kafka-console-producer.sh --topic test1
--broker-list localhost:9092   

About null keys

2016-06-15 Thread Adrienne Kole
Hi community,

Probably it is very basic question, as I am new to Kafka Streams.
I am trying to initialize KTable or KStream from  kafka topic. However, I
don't know how to avoid getting null keys. So,

  KTable source =
builder.stream(Serdes.String(),Serdes.String(),"t1");
  source.print();

I enter the input on t1 topic as:

one
two
one one

and get as a result:

null, (one <-null)
null, (two <-null)
null, (one one <-null)


So, the key is always null.

Is the key supposed to initialize after getting input from topic? If yes,
how can I initialize key in KTable after getting input? For example in
KStream we have KeyValueMapper that can be used to assign new keys and
values. However, in KTable we have only ValueMappers.

If not, how can I initialize the key of KTable from the input  from given
topic?


Cheers
Adrienne


Re: ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks

2016-06-15 Thread VG
Apologies. Sent to a wrong mailing group.



On Wed, Jun 15, 2016 at 7:48 PM, VG  wrote:

> I have a very simple driver which loads a textFile and filters a
> sub-string from each line in the textfile.
> When the collect action is executed , I am getting an exception.   (The
> file is only 90 MB - so I am confused what is going on..) I am running on a
> local standalone cluster
>
> 16/06/15 19:45:22 INFO BlockManagerInfo: Removed broadcast_2_piece0 on
> 192.168.56.1:56413 in memory (size: 2.5 KB, free: 2.4 GB)
> 16/06/15 19:45:22 INFO BlockManagerInfo: Removed broadcast_1_piece0 on
> 192.168.56.1:56413 in memory (size: 1900.0 B, free: 2.4 GB)
> 16/06/15 19:45:22 INFO BlockManagerInfo: Added rdd_2_1 on disk on
> 192.168.56.1:56413 (size: 2.7 MB)
> 16/06/15 19:45:22 INFO MemoryStore: Block taskresult_7 stored as bytes in
> memory (estimated size 2.7 MB, free 2.4 GB)
> 16/06/15 19:45:22 INFO BlockManagerInfo: Added taskresult_7 in memory on
> 192.168.56.1:56413 (size: 2.7 MB, free: 2.4 GB)
> 16/06/15 19:45:22 INFO Executor: Finished task 1.0 in stage 2.0 (TID 7).
> 2823777 bytes result sent via BlockManager)
> 16/06/15 19:45:22 INFO TaskSetManager: Starting task 2.0 in stage 2.0 (TID
> 8, localhost, partition 2, PROCESS_LOCAL, 5422 bytes)
> 16/06/15 19:45:22 INFO Executor: Running task 2.0 in stage 2.0 (TID 8)
> 16/06/15 19:45:22 INFO HadoopRDD: Input split:
> file:/C:/Users/i303551/Downloads/ariba-logs/ssws/access.2016.04.26/access.2016.04.26:67108864+25111592
> 16/06/15 19:45:22 INFO BlockManagerInfo: Added rdd_2_2 on disk on
> 192.168.56.1:56413 (size: 2.0 MB)
> 16/06/15 19:45:22 INFO MemoryStore: Block taskresult_8 stored as bytes in
> memory (estimated size 2.0 MB, free 2.4 GB)
> 16/06/15 19:45:22 INFO BlockManagerInfo: Added taskresult_8 in memory on
> 192.168.56.1:56413 (size: 2.0 MB, free: 2.4 GB)
> 16/06/15 19:45:22 INFO Executor: Finished task 2.0 in stage 2.0 (TID 8).
> 2143771 bytes result sent via BlockManager)
> 16/06/15 19:45:43 ERROR RetryingBlockFetcher: Exception while beginning
> fetch of 1 outstanding blocks
> java.io.IOException: Failed to connect to /192.168.56.1:56413
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
> at
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105)
> at
> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
> at
> org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.net.ConnectException: Connection timed out: no further
> information: /192.168.56.1:56413
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> ... 1 more
> 16/06/15 19:45:43 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1
> outstanding blocks after 5000 ms
> 16/06/15 19:46:04 ERROR RetryingBlockFetcher: Exception while beginning
> fetch of 1 outstanding blocks
> java.io.IOException: Failed to connect to /192.168.56.1:56413
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
> at
> 

ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks

2016-06-15 Thread VG
I have a very simple driver which loads a textFile and filters a sub-string
from each line in the textfile.
When the collect action is executed , I am getting an exception.   (The
file is only 90 MB - so I am confused what is going on..) I am running on a
local standalone cluster

16/06/15 19:45:22 INFO BlockManagerInfo: Removed broadcast_2_piece0 on
192.168.56.1:56413 in memory (size: 2.5 KB, free: 2.4 GB)
16/06/15 19:45:22 INFO BlockManagerInfo: Removed broadcast_1_piece0 on
192.168.56.1:56413 in memory (size: 1900.0 B, free: 2.4 GB)
16/06/15 19:45:22 INFO BlockManagerInfo: Added rdd_2_1 on disk on
192.168.56.1:56413 (size: 2.7 MB)
16/06/15 19:45:22 INFO MemoryStore: Block taskresult_7 stored as bytes in
memory (estimated size 2.7 MB, free 2.4 GB)
16/06/15 19:45:22 INFO BlockManagerInfo: Added taskresult_7 in memory on
192.168.56.1:56413 (size: 2.7 MB, free: 2.4 GB)
16/06/15 19:45:22 INFO Executor: Finished task 1.0 in stage 2.0 (TID 7).
2823777 bytes result sent via BlockManager)
16/06/15 19:45:22 INFO TaskSetManager: Starting task 2.0 in stage 2.0 (TID
8, localhost, partition 2, PROCESS_LOCAL, 5422 bytes)
16/06/15 19:45:22 INFO Executor: Running task 2.0 in stage 2.0 (TID 8)
16/06/15 19:45:22 INFO HadoopRDD: Input split:
file:/C:/Users/i303551/Downloads/ariba-logs/ssws/access.2016.04.26/access.2016.04.26:67108864+25111592
16/06/15 19:45:22 INFO BlockManagerInfo: Added rdd_2_2 on disk on
192.168.56.1:56413 (size: 2.0 MB)
16/06/15 19:45:22 INFO MemoryStore: Block taskresult_8 stored as bytes in
memory (estimated size 2.0 MB, free 2.4 GB)
16/06/15 19:45:22 INFO BlockManagerInfo: Added taskresult_8 in memory on
192.168.56.1:56413 (size: 2.0 MB, free: 2.4 GB)
16/06/15 19:45:22 INFO Executor: Finished task 2.0 in stage 2.0 (TID 8).
2143771 bytes result sent via BlockManager)
16/06/15 19:45:43 ERROR RetryingBlockFetcher: Exception while beginning
fetch of 1 outstanding blocks
java.io.IOException: Failed to connect to /192.168.56.1:56413
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:105)
at
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
at
org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:546)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:76)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:57)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:56)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.net.ConnectException: Connection timed out: no further
information: /192.168.56.1:56413
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
16/06/15 19:45:43 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1
outstanding blocks after 5000 ms
16/06/15 19:46:04 ERROR RetryingBlockFetcher: Exception while beginning
fetch of 1 outstanding blocks
java.io.IOException: Failed to connect to /192.168.56.1:56413
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:96)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at

RE: Kafka Connect HdfsSink and the Schema Registry

2016-06-15 Thread Tauzell, Dave
Thanks Ewan,

The second request was made by me directly.  I'm trying to add this 
functionality into my .Net application.  The library I'm using doesn't have an 
implementation of the AvroSeriazlizer that interacts with the schema registry.  
I've now added in code to make to POST to /subjects/-value with the 
schema.   Something I noticed is that I was using schema like this:

{
  "subject": "AuditHdfsTest5-value",
  "version": 1,
  "id": 5,
  "schema": 
"{\"type\":\"record\",\"name\":\"GenericAuditRecord\",\"namespace\":\"audit\",\"fields\":[{\"name\":\"xml\",\"type\":[\"string\",\"null\"]}]}"
}

When the connector got a message and did a lookup it didn't have the 
"namespace" field and the lookup failed.  I then posted a new version of the 
schema without the "namespace" field and it worked.

-Dave

Dave Tauzell | Senior Software Engineer | Surescripts
O: 651.855.3042 | www.surescripts.com |   dave.tauz...@surescripts.com
Connect with us: Twitter I LinkedIn I Facebook I YouTube


-Original Message-
From: Ewen Cheslack-Postava [mailto:e...@confluent.io]
Sent: Tuesday, June 14, 2016 6:59 PM
To: users@kafka.apache.org
Subject: Re: Kafka Connect HdfsSink and the Schema Registry

On Tue, Jun 14, 2016 at 8:08 AM, Tauzell, Dave  wrote:

> I have been able to get my C# client to put avro records to a Kafka
> topic and have the HdfsSink read and save them in files.  I am
> confused about interaction with the registry.  The kafka message
> contains a schema id an I see the connector look that up in the
> registry.  Then it also looks up a subject which is -value.
>
> What is the relationship between the passed schema id and the subject
> which is derived from the topic name?
>

The HDFS connector doesn't work directly with the schema registry, the 
AvroConverter does. I'm not sure what the second request you're seeing is
-- normally it would only look up the schema ID in order to get the schema.
Where are you seeing the second request, and can you include some logs? I can't 
think of any other requests the AvroConverter would be making just for 
deserialization.

The subject names are generating in the serializer as -key and 
-value and this is just the standardized approach Confluent's 
serializers use. The ID will have been registered under that subject.

-Ewen


>
> -Dave
>
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of
> the individual or entity to whom they are addressed. If you have
> received this e-mail in error, please notify the sender by reply
> e-mail immediately and destroy all copies of the e-mail and any attachments.
>



--
Thanks,
Ewen
This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


Re: Blocked in KafkaConsumer.commitOffsets

2016-06-15 Thread Robert Metzger
Hi,

I've looked at this issue already at the Flink list and recommended Hironori
to post here. It seems that the consumer is not returning from the poll()
call, that's why the commitOffsets() method can not enter the synchronized
block.
The KafkaConsumer is logging the following statements:

2016-06-10 20:29:53,677 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.
2016-06-10 20:29:53,678 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.
2016-06-10 20:29:53,679 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.

2016-06-10 20:56:53,982 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.


I guess that the poll() call is not returning within the given timeout
while trying to reconnect to the brokers?


On Wed, Jun 15, 2016 at 2:41 PM, Hironori Ogibayashi 
wrote:

> Hello,
>
> I am running stream processing job with Kafka and Flink.
> Flink reads records from Kafka.
>
> My software versions are:
> - Kafka broker: 0.9.0.2.4 (HDP 2.4.0.0 version)
> - Kafka client library: 0.9.0.1
> - Flink: 1.0.3
>
> Now I have problem that Flink job is sometimes blocked and consumer lag
> is increasing.
> I got thread dump during the situation.
>
> This is the blocked thread. Looks like blocked in
> KafkaConsumer.commitOffsets.
>
> 
> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
> prio=10 tid=0x7f2b14010800 nid=0x1b89a waiting for monitor entry
> [0x7f2b3ddfc000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
> - waiting to lock <0x000659111b58> (a
> org.apache.kafka.clients.consumer.KafkaConsumer)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
> - locked <0x000659111cc8> (a java.lang.Object)
> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> ---
>
> And lock 0x000659111b58 is held by the following thread.
>
> ---
> "Thread-9" daemon prio=10 tid=0x7f2b2440d000 nid=0x1b838 runnable
> [0x7f2b3dbfa000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
> - locked <0x000659457dc8> (a sun.nio.ch.Util$2)
> - locked <0x000659457db8> (a
> java.util.Collections$UnmodifiableSet)
> - locked <0x000659457108> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
> at
> org.apache.kafka.common.network.Selector.select(Selector.java:425)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
> at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
> - locked <0x000659111b58> (a
> org.apache.kafka.clients.consumer.KafkaConsumer)
> ---
>
> I am wondering why Flink's kafka consumer is blocked and any advice
> would be appreciated.
>
> Thanks,
> Hironori Ogibayashi
>


Blocked in KafkaConsumer.commitOffsets

2016-06-15 Thread Hironori Ogibayashi
Hello,

I am running stream processing job with Kafka and Flink.
Flink reads records from Kafka.

My software versions are:
- Kafka broker: 0.9.0.2.4 (HDP 2.4.0.0 version)
- Kafka client library: 0.9.0.1
- Flink: 1.0.3

Now I have problem that Flink job is sometimes blocked and consumer lag
is increasing.
I got thread dump during the situation.

This is the blocked thread. Looks like blocked in KafkaConsumer.commitOffsets.


"Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
prio=10 tid=0x7f2b14010800 nid=0x1b89a waiting for monitor entry
[0x7f2b3ddfc000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
- waiting to lock <0x000659111b58> (a
org.apache.kafka.clients.consumer.KafkaConsumer)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
- locked <0x000659111cc8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
---

And lock 0x000659111b58 is held by the following thread.

---
"Thread-9" daemon prio=10 tid=0x7f2b2440d000 nid=0x1b838 runnable
[0x7f2b3dbfa000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
- locked <0x000659457dc8> (a sun.nio.ch.Util$2)
- locked <0x000659457db8> (a java.util.Collections$UnmodifiableSet)
- locked <0x000659457108> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
at org.apache.kafka.common.network.Selector.select(Selector.java:425)
at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
- locked <0x000659111b58> (a
org.apache.kafka.clients.consumer.KafkaConsumer)
---

I am wondering why Flink's kafka consumer is blocked and any advice
would be appreciated.

Thanks,
Hironori Ogibayashi


Re: async producer retry behavior - at least once guarantee

2016-06-15 Thread R Krishna
Increasing reconnect.backoff.ms=1000 ms and BLOCK_ON_BUFFER_FULL_CONFIG to
true did not help either. The messages are simply lost.

Upset to find that there is no way to handle messages that are lost when
broker itself is not available and retries are not part of broker
connection issues.
https://issues.apache.org/jira/browse/KAFKA-156

The slide 24 of
http://www.slideshare.net/jhols1/apache-kafka-reliability-guarantees-stratahadoop-nyc-2015
also shows that the retries happen only if drain response fails but does
this include no or null response?

Let me try modifying some of these classes.




On Mon, Jun 13, 2016 at 8:32 PM, R Krishna  wrote:

> As part of testing v0.9 Kafka at least once guarantees, we tried
> disconnecting Producer network and found that retries=1000 are not
> happening. We get a
>
> WARN  kafka-producer-network-thread | producer-1
> [.kafka.clients.producer.internals.Sender]  - Got error produce response
> with correlation id 6474 on topic-partition test-topic-3-100-38, retrying
> (999 attempts left). Error: NETWORK_EXCEPTION
>
> And
>
> org.apache.kafka.common.errors.TimeoutException: Batch Expired
>
> When we tried debugging by putting a breakpoint in Accumulator and
> BatchRecord classes to stop when batch.attempts > 1 and it never stops
> beyond a value of 1 where the batch is reenqueued and although canRetry()
> always returns true. Is there a better way to debug this?
> clients.producer.internals.Sender.completeBatch(RecordBatch, Errors, long,
> long, long)
>
> The producer decides to skip messages when there is a network issue and
> was also verified by checking topic message counts.
>
> Also, the only option in an Async send is a callback on completion where
> even the recordmetadata is empty as expected because there was no server
> communication but how do we get the record itself after all the retries
> have happened so that nothing is lost?
> reconnect.backoff.ms = 100
>
> retry.backoff.ms = 100
> buffer.memory = 33554432
> timeout.ms = 3
> connections.max.idle.ms = 54
> max.in.flight.requests.per.connection = 5
> metrics.num.samples = 2
> request.timeout.ms = 5000
> acks = 1
> batch.size = 16384
> receive.buffer.bytes = 32768
> retries = 1000 
> max.request.size = 1048576
> metrics.sample.window.ms = 3
> send.buffer.bytes = 131072
> linger.ms = 10
> /*
>  * Produce a record without waiting for server. This includes a
> callback
>  * that will print an error if something goes wrong
>  */
> public static void produceAsync(Producer producer,
> String topic, String key, String value) {
> ProducerRecord record = new ProducerRecord String>(topic, value);
> producer.send(record, new DemoProducerCallback());
> }
>
> public static class DemoProducerCallback implements Callback {
> @Override
> public void onCompletion(RecordMetadata recordMetadata, Exception
> e) {
> if (e != null) {
> System.out.println("Error producing to topic " +
> ((recordMetadata != null) ?
> recordMetadata.topic() : ""));
> e.printStackTrace();
> }
> }
> }
>



-- 
Radha Krishna, Proddaturi
253-234-5657


General Question About Kafka

2016-06-15 Thread ali
Hello Guys. 

 

We are going to install Apache Kafka in our local data center and different 

producers which are distributed across different locations will be connected
to this server. 

Our Producers will use Internet connection and also will send 10mg data
packages every 30 seconds.

I was wondering is actually Apache Kafka suite for my scenario ? Since we
will use Internet connection

internet , should I be worried about network related problems such as
performance and latency ?

 

Thank you

Ali