Re: kafka cluster network bandwidth is too high

2016-12-07 Thread Matthias J. Sax
You cannot sent images over the mailing list. They get automatically
removed.

On 12/6/16 11:55 PM, 陈超 wrote:
> Hi kafka developer,
> 
>  
> 
>  I have a kafka cluster with 3 node. And it have 3 topic now. We
> have not many data into the kafka topic now. But the node sync data to
> each other node bandwidth is up to 4 Mb/s. I don’t know why so high.
> This is the picture below:
> 
>  
> 
> Iftop info:
> 
> 说明:
> C:\Users\lenovo\AppData\Roaming\Tencent\Users\247135449\QQ\WinTemp\RichOle\PZZ[Q[7A8_0F%{WM6ZC[K{O.png
> 
>  
> 
> Ps –mp kafkapid –o THREAD,tid,time
> 
> 说明:
> C:\Users\lenovo\AppData\Roaming\Tencent\Users\247135449\QQ\WinTemp\RichOle\5(X37VYS5XVO7@@MF6SMX6T.png
> 
>  
> 
> Jstack info:
> 
> 说明:
> C:\Users\lenovo\AppData\Roaming\Tencent\Users\247135449\QQ\WinTemp\RichOle\SEXO2G62J[L9}GDSO7)BS}B.png
> 
>  
> 
> Can you help me find the problem?
> 
> Thank you very much!
> 



signature.asc
Description: OpenPGP digital signature


Re: Q about doc of consumer

2016-12-07 Thread Matthias J. Sax
You cannot send images over the mailing list... they get automatically
removed.

On 12/6/16 11:15 PM, paradixrain wrote:
> Dear kafka,
> I think there is an error in the document, is that right?
> 
> 
> Here's what I did:
> Step 1:
>  open a producer
> ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
> 
> Step 2:
> open a consumer
> ./kafka-console-consumer.sh --zookeeper localhost:9092 --topic test
> --from-beginning
> 
> Step 3:
> I input something in producer
> 
> but got errors below in consumer:
> 
> 
> Step 4:
> I change the port in Step 2 from 9090 to 2181, and restarted consumer
> after that I got what I want
> 
> 
> --
> YOUR FRIEND,
> Ryan
>  



signature.asc
Description: OpenPGP digital signature


Re: Kafka Queue

2016-12-07 Thread Hans Jespersen
are you setting the group.id property to be the same on both consumers?

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Wed, Dec 7, 2016 at 12:36 PM, Justin Smith  wrote:

> I read this paragraph under Kafka as a Messaging System.
>
>
>
> “The consumer group concept in Kafka generalizes these two concepts. As
> with a queue the consumer group allows you to divide up processing over a
> collection of processes (the members of the consumer group). As with
> publish-subscribe, Kafka allows you to broadcast messages to multiple
> consumer groups.”
>
>
>
> But I cannot find where you are able to divide up processing over a
> collection of processors. I have 2 consumers and no matter what I do they
> always both get the same message. Is there a java reference you can point
> me to where I can understand how to accomplish this?
>
>
>
> Thanks a lot,
>
> *Justin Smith*
> *Software Developer*
>
> [image: Clearsense] 
>
> c: 904.497.3035 1300 Marsh Landing Pkwy., Ste. 105,  Jacksonville
> Beach, FL 32250
> clearsense.com    |   *jsm...@clearsense.com
> *   |   Follow on Twitter
>    |   Connect on LinkedIn
> 
>
>
>


Re: Kafka Queue

2016-12-07 Thread Sharninder
Have you a set them to the same consumer group ID? That's what "identities"
a consumer group.

On Thu, Dec 8, 2016 at 2:06 AM, Justin Smith  wrote:

> I read this paragraph under Kafka as a Messaging System.
>
>
>
> “The consumer group concept in Kafka generalizes these two concepts. As
> with a queue the consumer group allows you to divide up processing over a
> collection of processes (the members of the consumer group). As with
> publish-subscribe, Kafka allows you to broadcast messages to multiple
> consumer groups.”
>
>
>
> But I cannot find where you are able to divide up processing over a
> collection of processors. I have 2 consumers and no matter what I do they
> always both get the same message. Is there a java reference you can point
> me to where I can understand how to accomplish this?
>
>
>
> Thanks a lot,
>
> *Justin Smith*
> *Software Developer*
>
> [image: Clearsense] 
>
> c: 904.497.3035 <(904)%20497-3035> 1300 Marsh Landing Pkwy., Ste.
> 105,  Jacksonville Beach, FL 32250
> clearsense.com    |   *jsm...@clearsense.com
> *   |   Follow on Twitter
>    |   Connect on LinkedIn
> 
>
>
>



-- 
--
Sharninder


kafka cluster network bandwidth is too high

2016-12-07 Thread 陈超
Hi kafka developer,

 

 I have a kafka cluster with 3 node. And it have 3 topic now. We
have not many data into the kafka topic now. But the node sync data to each
other node bandwidth is up to 4 Mb/s. I don’t know why so high. This is the
picture below:

 

Iftop info:

说明:
C:\Users\lenovo\AppData\Roaming\Tencent\Users\247135449\QQ\WinTemp\RichOle\P
ZZ[Q[7A8_0F%{WM6ZC[K{O.png

 

Ps �Cmp kafkapid �Co THREAD,tid,time

说明:
C:\Users\lenovo\AppData\Roaming\Tencent\Users\247135449\QQ\WinTemp\RichOle\5
(X37VYS5XVO7@@MF6SMX6T.png

 

Jstack info:

说明:
C:\Users\lenovo\AppData\Roaming\Tencent\Users\247135449\QQ\WinTemp\RichOle\S
EXO2G62J[L9}GDSO7)BS}B.png

 

Can you help me find the problem? 

Thank you very much!



Q about doc of consumer

2016-12-07 Thread paradixrain
Dear kafka,
I think there is an error in the document, is that right?


Here's what I did:
Step 1:
 open a producer
./kafka-console-producer.sh --broker-list localhost:9092 --topic test

Step 2:
open a consumer
./kafka-console-consumer.sh --zookeeper localhost:9092 --topic test 
--from-beginning

Step 3:
I input something in producer

but got errors below in consumer:


Step 4:
I change the port in Step 2 from 9090 to 2181, and restarted consumer
after that I got what I want
 

--
YOUR FRIEND,
 Ryan

Problem with multiple joins in one topology

2016-12-07 Thread Brian Krahmer

Hey guys,

  I'm having a hell of a time here.  I've worked for days trying to get 
this joining pipeline working.  I thought I had it working last week, 
but my jubilation was premature.  The point was to take data in from 
five different topics and merge them together to obtain one enriched 
event (output to compacted topic).  Can anybody spot what I'm doing 
wrong?  The ordering makes no difference.  For example, I've switched 
the locationInput and the vehicleReservedInput inputs in the leftJoin 
calls below, and I get the same results.  The location part of the 
enrichment works while the vehicleReserved part does not.  I can't even 
think of how to restructure the topology without resorting to building 
my own lower-level topology.


thanks,
brian

KTable fleetInput = 
builder.table(Serdes.String(),
vehicleFinderDataSerde, FLEET_TOPIC, 
VEHICLE_ENRICHER_FLEET_STORE);

...
fleetInput.print("fleetInput");
locationInput.print("locationInput");
vehicleReservedInput.print("vehicleReservedInput");
vehicleReleasedInput.print("vehicleReleasedInput");
vehicleUsageEndedInput.print("vehicleUsageEndedInput");

KTable mergeStepOne = 
fleetInput.leftJoin(locationInput, VehicleFinderData::merge);

mergeStepOne.print("mergeStepOne");
KTable mergeStepTwo = 
mergeStepOne.leftJoin(vehicleReleasedInput, VehicleFinderData::merge);

mergeStepTwo.print("mergeStepTwo");
KTable mergeStepThree = 
mergeStepTwo.leftJoin(vehicleUsageEndedInput, VehicleFinderData::merge);

mergeStepThree.print("mergeStepThree");
KTable mergeStepFour = 
mergeStepThree.leftJoin(vehicleReservedInput, VehicleFinderData::merge);

mergeStepFour.print("mergeStepFour");

** Generate a location event **

[locationInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Deserializing from topic VehicleEnricherFleetStore
Merge operation called
[mergeStepOne]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Merge operation called
[mergeStepTwo]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Merge operation called
[mergeStepThree]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Merge operation called
[mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)


** New event correctly serialized **

---

** Generate a vehicleReserved event **

[vehicleReservedInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped 
json value}<-null)

[mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , (null<-null)

** NO EVENT **



Kafka Queue

2016-12-07 Thread Justin Smith
I read this paragraph under Kafka as a Messaging System.

"The consumer group concept in Kafka generalizes these two concepts. As with a 
queue the consumer group allows you to divide up processing over a collection 
of processes (the members of the consumer group). As with publish-subscribe, 
Kafka allows you to broadcast messages to multiple consumer groups."

But I cannot find where you are able to divide up processing over a collection 
of processors. I have 2 consumers and no matter what I do they always both get 
the same message. Is there a java reference you can point me to where I can 
understand how to accomplish this?

Thanks a lot,
Justin Smith
Software Developer
[Clearsense]
c: 904.497.3035 1300 Marsh Landing Pkwy., Ste. 105,  Jacksonville Beach, FL 
32250
clearsense.com   |   
jsm...@clearsense.com   |   Follow on 
Twitter   |   Connect on 
LinkedIn



Re: [VOTE] 0.10.1.1 RC0

2016-12-07 Thread Guozhang Wang
One notification is that in this bug-fix release we include artifacts built
from Scala 2.12.1 as well, as a pre-alpha product for the Scala community
to try and test it out (it is built with Java8 while all other artifacts
are built with Java7). We hope to formally add the scala 2.12 support in
future minor releases.


Guozhang

On Wed, Dec 7, 2016 at 2:46 PM, Guozhang Wang  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the first candidate for the release of Apache Kafka 0.10.1.1.
> This is a bug fix release and it includes fixes and improvements from 27
> JIRAs. See the release notes for more details:
>
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc0/RELEASE_NOTES.html
>
> *** Please download, test and vote by Monday, 13 December, 8am PT ***
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc0/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc0/javadoc/
>
> * Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc0 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> 8b77507083fdd427ce81021228e7e346da0d814c
>
>
> Thanks,
> Guozhang
>



-- 
-- Guozhang


[VOTE] 0.10.1.1 RC0

2016-12-07 Thread Guozhang Wang
Hello Kafka users, developers and client-developers,

This is the first candidate for the release of Apache Kafka 0.10.1.1. This is
a bug fix release and it includes fixes and improvements from 27 JIRAs. See
the release notes for more details:

http://home.apache.org/~guozhang/kafka-0.10.1.1-rc0/RELEASE_NOTES.html

*** Please download, test and vote by Monday, 13 December, 8am PT ***

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~guozhang/kafka-0.10.1.1-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
http://home.apache.org/~guozhang/kafka-0.10.1.1-rc0/javadoc/

* Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc0 tag:
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=8b77507083fdd427ce81021228e7e346da0d814c


Thanks,
Guozhang


Re: RdKafka & setting offsets

2016-12-07 Thread Matt Howlett
Two ideas:

you could use a new consumer group id and set the TopicConfig property
"auto.offset.reset" to "smallest". Consumers in the new group will read
from the beginning on all partitions.

Alternatively, as an example of how to set the offset explicitly, you can
modify the AdvancedConsumer example in rdkafka-dotnet by changing line 62
to:
consumer.Assign(partitions.Select(p => new TopicPartitionOffset(p.Topic,
P.Partition, 0)).ToList());
the consumer will read from offset 0 on all partitions.

Note how this example works: It first subscribes to a topic using the
.Subscribe method on EventConsumer. It then receives a partition allocation
from Kafka via the OnPartitionAssigned event. In order to actually start
consuming any messages, you must explicitly call the Assign method with a
list of TopicPartitionOffsets. You will typically call this with the list
passed to you in the OnPartitionAssigned event unmodified, but not
necessarily. Here we chose to explicitly set the offset from which to start
reading.

Side note (to aid understanding): it's is also possible to not use Kafka's
built in group management and manage partition allocation yourself. To do
this, you'd not call .Subscribe, not implement OnPartitionAssigned and
OnPartitionRevoked and call the Assign method with your desired
TopicPartitionOffset's before calling consumer.Start().



On Wed, Dec 7, 2016 at 12:53 PM, Doyle, Keith <
keith.do...@greenwayhealth.com> wrote:

>
>
> I’m attempting to set the offset for a RdKafka-dotnet consumer in order to
> re-read the topic, but I’ve not seen any documentation or examples that do
> this.   I saw a reference for librdkafka that seems to show a set_offset
> method off the TopicPartition class and an Offset property on the
> TopicPartitionOffset class, but instantiating the former in RdKafka-dotnet
> shows no such method, so I’m not sure it’s been extended to
> RdKafka-dotnet.   In addition, I have no idea how to go about using these
> TopicPartition/Offset classes in the context of the RdKafka EventConsumer,
> whether it should be passed in as a parameter to the Subscribe or Start
> methods, or to the Position method (and if so, when to invoke it—before or
> after the Subscribe & Start, etc…).
>
>
>
> Can anyone clarify how you’re supposed to set the offset on a topic in the
> context of RdKafka?
>
>
>
> --
>
>
>
> [image: Greenway_Health_PNG_large_NO_tag]
>
>
>
> *Keith Doyle*  *|*  Senior Software Engineer
>
> *Greenway Health*  *|*  4301 W. Boy Scout Blvd., Suite 800, Tampa, FL
> 33607
>
> (702) 256-9911 office  *|*  GreenwayHealth.com
> 
>
> [image: cid:image003.png@01D04086.868CBCB0]
> [image:
> cid:image004.png@01D04086.868CBCB0]
> [image:
> cid:image005.png@01D04086.868CBCB0] 
>
>
> NOTICE: This e-mail message and all attachments transmitted with it may
> contain legally privileged and confidential information intended solely for
> the use of the addressee. If the reader of this message is not the intended
> recipient, you are hereby notified that any reading, dissemination,
> distribution, copying, or other use of this message or its attachments is
> strictly prohibited. If you have received this message in error, please
> notify the sender immediately by electronic mail and delete this message
> and all copies and backups thereof. Thank you. Greenway Health.
>


Re: Creating a connector with Kafka Connect Distributed returning 500 error

2016-12-07 Thread Konstantine Karantasis
The bug I was referring to was only in trunk for just a while. Thus, your
issue must be related to something else, even though the response statuses
are similar.

Let me know if you want to share a bigger and more detailed (DEBUG level at
least) snapshot of the parts of the logs that might be related to this
failure.

Cheers,
Konstantine

On Wed, Dec 7, 2016 at 11:15 AM, Phillip Mann  wrote:

> Hello Konstantine,
>
>
>
> Thanks for your reply.
>
>
>
> I am using Confluent 3.0.1 installed on my machine and our cluster.
> However, our AWS cluster has Confluent 3.1.1 installed so I will test with
> 3.1.1 client and cluster and see if this resolves the issue.  Additionally,
> I’ll use the debug levels if this does not resolve my issue.
>
>
>
> If not, I’ll explore the trunk repo but I would prefer to use stable
> versions of CP / Kafka that can be accessed with Maven.
>
>
>
> Thanks again.
>
>
>
> Phillip
>
>
>
> > Hi Phillip,
>
> >
>
> > may I ask which Kafka version did you use?
>
> >
>
> > trunk repo in Apache Kafka contained briefly a bug in Connect framework
>
> > (during the past week) that produced failures similar to the one you
>
> > describe (only in distributed mode). A fix has been pushed since
> yesterday.
>
> >
>
> > 3) Some useful step-by-step information is provided in the quickstart
> guide
>
> > here:
>
> > https://kafka.apache.org/quickstart#quickstart_kafkaconnect
>
> >
>
> > as well as in the documentation of Confluent:
>
> > http://docs.confluent.io/3.1.0/connect/quickstart.html#
>
> >
>
> > Alternatively, you might want to follow the quickstart guide of one of
> the
>
> > open source connectors, here:
>
> > http://docs.confluent.io/3.1.0/connect/connectors.html
>
> >
>
> > 2) From what you mention above, it seems more like that you're hitting
> this
>
> > temporary bug. But again that depends on which Kafka version you've been
>
> > using.
>
> >
>
> > 1) Generating logs, in one of the debug levels (e.g. DEBUG, TRACE) is
>
> > usually a useful source of information.
>
> > Alternatively you may chose to run Connect in debug mode by setting the
>
> > environment variable KAFKA_DEBUG and attaching a remote debugger to it
>
> > (such as IntelliJ's remote debugging capability). With respect to live
>
> > debugging, we are planning to post a step-by-step guide for Kafka and
> Kafka
>
> > Connect soon.
>
> >
>
> > Regards,
>
> > Konstantine
>
> >
>
> >> On Tue, Dec 6, 2016 at 11:22 AM, Phillip Mann  wrote:
>
> >>
>
> >> I am working on migrating from Camus to Kafka Connect. I am working on
> the
>
> >> implementation of Kafka Connect and specifically focused on distributed
>
> >> mode. I am able to start a worker successfully on my local machine
> which I
>
> >> assume communicates with my Kafka cluster. I am further able to run two
> GET
>
> >> commands such as / and /connector-plugins which return the correct JSON.
>
> >> However, when I try to POST a command to create a connector, I receive a
>
> >> 500 error and a time out. Specifically, I use this command to POST for
>
> >> testing:
>
> >>
>
> >> curl -X POST -H "Content-Type: application/json" --data '{"name":
>
> >> "local-file-sink", "config": {"connector.class":"
> FileStreamSinkConnector",
>
> >> "tasks.max":"1", "file":"test.sink.txt", "topics":"myTopic" }}'
>
> >> localhost:8083/connectors
>
> >>
>
> >> and eventually I get this response:
>
> >>
>
> >> {"error_code": 500, "message": "Request timed out"}
>
> >>
>
> >> I am lost as to what is going on. The logs from my Kafka Connect
>
> >> distributed worker show this:
>
> >>
>
> >> [2016-12-05 14:34:32,436] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:22:34:32
>
> >> +] "GET /connector-plugins HTTP/1.1" 200 315  2
>
> >> (org.apache.kafka.connect.runtime.rest.RestServer:60)
>
> >> [2016-12-05 15:05:25,422] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:05:25
>
> >> +] "GET /connector-plugins HTTP/1.1" 200 315  3
>
> >> (org.apache.kafka.connect.runtime.rest.RestServer:60)
>
> >> [2016-12-05 15:05:28,389] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:05:28
>
> >> +] "GET /connector-plugins HTTP/1.1" 200 315  2
>
> >> (org.apache.kafka.connect.runtime.rest.RestServer:60)
>
> >> [2016-12-05 15:07:38,644] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:06:08
>
> >> +] "GET /connectors HTTP/1.1" 500 48  90003
> (org.apache.kafka.connect.
>
> >> runtime.rest.RestServer:60)
>
> >> [2016-12-05 15:07:44,450] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:07:44
>
> >> +] "GET /connector-plugins HTTP/1.1" 200 315  1
>
> >> (org.apache.kafka.connect.runtime.rest.RestServer:60)
>
> >> [2016-12-05 15:13:06,703] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:11:36
>
> >> +] "POST /connectors HTTP/1.1" 500 48  90003
> (org.apache.kafka.connect.
>
> >> runtime.rest.RestServer:60)
>
> >> [2016-12-05 15:15:38,506] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:14:08
>
> >> +] "POST /connectors HTTP/1.1" 500 48  90005
> (org.apache.kafka.connect.
>
> >> runtime.rest.RestServer:60)
>
> >>
>
> >> where you can see the er

RdKafka & setting offsets

2016-12-07 Thread Doyle, Keith

I'm attempting to set the offset for a RdKafka-dotnet consumer in order to 
re-read the topic, but I've not seen any documentation or examples that do 
this.   I saw a reference for librdkafka that seems to show a set_offset method 
off the TopicPartition class and an Offset property on the TopicPartitionOffset 
class, but instantiating the former in RdKafka-dotnet shows no such method, so 
I'm not sure it's been extended to RdKafka-dotnet.   In addition, I have no 
idea how to go about using these TopicPartition/Offset classes in the context 
of the RdKafka EventConsumer, whether it should be passed in as a parameter to 
the Subscribe or Start methods, or to the Position method (and if so, when to 
invoke it-before or after the Subscribe & Start, etc...).

Can anyone clarify how you're supposed to set the offset on a topic in the 
context of RdKafka?

--

[Greenway_Health_PNG_large_NO_tag]

Keith Doyle  |  Senior Software Engineer
Greenway Health  |  4301 W. Boy Scout Blvd., Suite 800, Tampa, FL 33607
(702) 256-9911 office  |  GreenwayHealth.com
[cid:image003.png@01D04086.868CBCB0][cid:image004.png@01D04086.868CBCB0][cid:image005.png@01D04086.868CBCB0]

NOTICE: This e-mail message and all attachments transmitted with it may contain 
legally privileged and confidential information intended solely for the use of 
the addressee. If the reader of this message is not the intended recipient, you 
are hereby notified that any reading, dissemination, distribution, copying, or 
other use of this message or its attachments is strictly prohibited. If you 
have received this message in error, please notify the sender immediately by 
electronic mail and delete this message and all copies and backups thereof. 
Thank you. Greenway Health.


Re: Creating a connector with Kafka Connect Distributed returning 500 error

2016-12-07 Thread Phillip Mann
Hello Konstantine,

Thanks for your reply.

I am using Confluent 3.0.1 installed on my machine and our cluster.  However, 
our AWS cluster has Confluent 3.1.1 installed so I will test with 3.1.1 client 
and cluster and see if this resolves the issue.  Additionally, I’ll use the 
debug levels if this does not resolve my issue.

If not, I’ll explore the trunk repo but I would prefer to use stable versions 
of CP / Kafka that can be accessed with Maven.

Thanks again.

Phillip

> Hi Phillip,
>
> may I ask which Kafka version did you use?
>
> trunk repo in Apache Kafka contained briefly a bug in Connect framework
> (during the past week) that produced failures similar to the one you
> describe (only in distributed mode). A fix has been pushed since yesterday.
>
> 3) Some useful step-by-step information is provided in the quickstart guide
> here:
> https://kafka.apache.org/quickstart#quickstart_kafkaconnect
>
> as well as in the documentation of Confluent:
> http://docs.confluent.io/3.1.0/connect/quickstart.html#
>
> Alternatively, you might want to follow the quickstart guide of one of the
> open source connectors, here:
> http://docs.confluent.io/3.1.0/connect/connectors.html
>
> 2) From what you mention above, it seems more like that you're hitting this
> temporary bug. But again that depends on which Kafka version you've been
> using.
>
> 1) Generating logs, in one of the debug levels (e.g. DEBUG, TRACE) is
> usually a useful source of information.
> Alternatively you may chose to run Connect in debug mode by setting the
> environment variable KAFKA_DEBUG and attaching a remote debugger to it
> (such as IntelliJ's remote debugging capability). With respect to live
> debugging, we are planning to post a step-by-step guide for Kafka and Kafka
> Connect soon.
>
> Regards,
> Konstantine
>
>> On Tue, Dec 6, 2016 at 11:22 AM, Phillip Mann  wrote:
>>
>> I am working on migrating from Camus to Kafka Connect. I am working on the
>> implementation of Kafka Connect and specifically focused on distributed
>> mode. I am able to start a worker successfully on my local machine which I
>> assume communicates with my Kafka cluster. I am further able to run two GET
>> commands such as / and /connector-plugins which return the correct JSON.
>> However, when I try to POST a command to create a connector, I receive a
>> 500 error and a time out. Specifically, I use this command to POST for
>> testing:
>>
>> curl -X POST -H "Content-Type: application/json" --data '{"name":
>> "local-file-sink", "config": {"connector.class":"FileStreamSinkConnector",
>> "tasks.max":"1", "file":"test.sink.txt", "topics":"myTopic" }}'
>> localhost:8083/connectors
>>
>> and eventually I get this response:
>>
>> {"error_code": 500, "message": "Request timed out"}
>>
>> I am lost as to what is going on. The logs from my Kafka Connect
>> distributed worker show this:
>>
>> [2016-12-05 14:34:32,436] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:22:34:32
>> +] "GET /connector-plugins HTTP/1.1" 200 315  2
>> (org.apache.kafka.connect.runtime.rest.RestServer:60)
>> [2016-12-05 15:05:25,422] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:05:25
>> +] "GET /connector-plugins HTTP/1.1" 200 315  3
>> (org.apache.kafka.connect.runtime.rest.RestServer:60)
>> [2016-12-05 15:05:28,389] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:05:28
>> +] "GET /connector-plugins HTTP/1.1" 200 315  2
>> (org.apache.kafka.connect.runtime.rest.RestServer:60)
>> [2016-12-05 15:07:38,644] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:06:08
>> +] "GET /connectors HTTP/1.1" 500 48  90003 (org.apache.kafka.connect.
>> runtime.rest.RestServer:60)
>> [2016-12-05 15:07:44,450] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:07:44
>> +] "GET /connector-plugins HTTP/1.1" 200 315  1
>> (org.apache.kafka.connect.runtime.rest.RestServer:60)
>> [2016-12-05 15:13:06,703] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:11:36
>> +] "POST /connectors HTTP/1.1" 500 48  90003 (org.apache.kafka.connect.
>> runtime.rest.RestServer:60)
>> [2016-12-05 15:15:38,506] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:14:08
>> +] "POST /connectors HTTP/1.1" 500 48  90005 (org.apache.kafka.connect.
>> runtime.rest.RestServer:60)
>>
>> where you can see the error codes and the commands.
>>
>> I guess my main questions and issues are:
>>
>>   1.  How can I better debug Kafka Connect so I can try and fix this?
>>   2.  Is there anything that I'm doing that is glaringly wrong?
>>   3.  Is there any step-by-step documentation or blog posts on getting a
>> Kafka Connect distributed worker and connector to run? I have not really
>> seen anything or even best practices kinds of documentation? Maybe I am
>> just way too early of an adopter.
>>
>> I look forward to hearing back from the community and thank you for your
>> help!
>>
>>


Re: Massive SSL performance degredation

2016-12-07 Thread Aaron Wilkinson
The maintainer of librdkafka was able to reproduce the latency.  He thinks
it may be some sort of batching algorithm similar to Nagle inside OpenSSL.

Status of the issue is maintained at:
https://github.com/edenhill/librdkafka/issues/920

Thanks to all on this mailing list for your help in diagnosing.

- Aaron



On Fri, Nov 18, 2016 at 5:06 PM, Aaron Wilkinson 
wrote:

> So the kafka performance tools seem to indicate that the problem is not in
> the broker, but rather somewhere in librdkafka/OpenSSL.  I'm not completely
> sure I got the configs right to try and eliminate any batching
> considerations in the latency calculation (it seems like encrypting /
> decrypting a batch of 1000 messages 1 time would be more efficient than
> encrypting / decrypting 1 message 1000 times and I am interested in the
> latter).  BUT doing a relative test of plaintext vs ssl seemed to show the
> promised 20% - 50% overhead rather than the thousands of percent I am
> seeing with librdkafka + OpenSSL.
>
> Plaintext connection
> /usr/local/kafka/kafka_2.11-0.10.1.0$ bin/kafka-run-class.sh
> kafka.tools.EndToEndLatency :9092 latency.test 1 1 128
> /usr/local/kafka/kafka_2.11-0.10.1.0/config/client.properties
> ...
> Avg latency: 1.8739 ms
>
> SSL Connection
> bin/kafka-run-class.sh kafka.tools.EndToEndLatency :9093
> latency.test 1 1 128 /usr/local/kafka/kafka_2.11-0.
> 10.1.0/config/client-ssl.properties
> ...
> Avg latency: 2.4234 ms
>
> Also bin/kafka-producer-perf-test.sh --topic producer.latency.test
> --num-records 20 --record-size 128 --throughput 1 --producer.config
> /usr/local/kafka/kafka_2.11-0.10.1.0/config/client-ssl.properties
> 7 records sent, 1.3 records/sec (0.00 MB/sec), 94.7 ms avg latency, 591.0
> max latency.
> 6 records sent, 1.0 records/sec (0.00 MB/sec), 1.8 ms avg latency, 2.0 max
> latency.
> 5 records sent, 1.0 records/sec (0.00 MB/sec), 2.8 ms avg latency, 3.0 max
> latency.
> 20 records sent, 1.022809 records/sec (0.00 MB/sec), 34.75 ms avg latency,
> 591.00 ms max latency, 3 ms 50th, 591 ms 95th, 591 ms 99th, 591 ms 99.9th.
>
> Seems to show decent latency after the initial SSL handshake as well.
>
> So I will try to look harder at how librdkafka + OpenSSL are doing SSL.
> If I figure anything out, I'll do one last follow up email to save someone
> else with this stack a similar headache.
>
> Thanks for teaching me about the command line tools, guys!
> - Aaron
>
>
>
> On Fri, Nov 18, 2016 at 2:59 PM, Aaron Wilkinson 
> wrote:
>
>> Thank you both, Hans and Rajini.
>>
>> I will try out all the methods you suggested and report back.
>>
>> As an aside my investigation into the known, slow software implementation
>> of the GCM class of cipher algorithms in java 8 was a bust.  I tried all of
>> the default cipher suites common to OpenSSL (on the client) and java (on
>> the broker) and they all gave consistent (slow) results of about 40 ms per
>> hop.
>>
>> For posterity at the time of this writing those were (OpenSSL format):
>> DHE-DSS-AES256-GCM-SHA384
>> DHE-DSS-AES256-SHA256
>> DHE-DSS-AES256-SHA
>> DHE-DSS-AES128-GCM-SHA256
>> DHE-DSS-AES128-SHA256
>> DHE-DSS-AES128-SHA
>> EDH-DSS-DES-CBC3-SHA
>>
>> I can't guarantee that I'm not looking at a problem where the java crypto
>> module is not using hardware acceleration.  (I've verified that OpenSSL has
>> access to the aesni hardware instructions, but I have no idea how to tell
>> if the java crypto module is making use of them.)  However, it would appear
>> that it is at least not a problem specific to the GCM algorithm.
>>
>> - Aaron
>>
>>
>> On Fri, Nov 18, 2016 at 2:37 AM, Rajini Sivaram <
>> rajinisiva...@googlemail.com> wrote:
>>
>>> You can use the tools shipped with Kafka to measure latency.
>>>
>>> For latency at low load, run:
>>>
>>>
>>>- bin/kafka-run-class.sh kafka.tools.EndToEndLatency
>>>
>>>
>>> You may also find it useful to run producer performance test at different
>>> throughputs. The tool prints out latency as well:
>>>
>>>
>>>- bin/kafka-producer-perf-test.sh
>>>
>>>
>>> On Fri, Nov 18, 2016 at 1:25 AM, Hans Jespersen 
>>> wrote:
>>>
>>> > Publish lots of messages and measure in seconds or minutes. Otherwise
>>> you
>>> > are just benchmarking the initial SSL handshake setup time which should
>>> > normally be a one time overhead, not a per message overhead. If you
>>> just
>>> > send one message then of course SSL is much slower.
>>> >
>>> > -hans
>>> >
>>> > > On Nov 18, 2016, at 1:07 AM, Aaron Wilkinson >> >
>>> > wrote:
>>> > >
>>> > > Hi, Hans.  I was able to get the command line producer / consumer
>>> working
>>> > > with SSL but I'm not sure how to measure millisecond resolution
>>> latency
>>> > > with them.  I thought maybe the '--property print.timestamp=true'
>>> > argument
>>> > > would help, but only has second resolution.  Do you know of any way
>>> to
>>> > get
>>> > > the consumer to print out a receipt time-stamp with millisecond
>>> > > resolution?  Or of any extended documentation on the comma

Re: Consumer poll - no results

2016-12-07 Thread Mohit Anchlia
Is auto.offset.reset honored just the first time consumer starts and
polling? In other words everytime consumer starts does it start from the
beginning even if it has already read those messages?

On Wed, Dec 7, 2016 at 1:43 AM, Harald Kirsch 
wrote:

> Have you defined
>
> auto.offset.reset: earliest
>
> or otherwise made sure (KafkaConsumer.position()) that the consumer does
> not just wait for *new* messages to arrive?
>
> Harald.
>
>
>
> On 06.12.2016 20:11, Mohit Anchlia wrote:
>
>> I see this message in the logs:
>>
>> [2016-12-06 13:54:16,586] INFO [GroupCoordinator 0]: Preparing to
>> restabilize group DemoConsumer with old generation 3
>> (kafka.coordinator.GroupCoordinator)
>>
>>
>>
>> On Tue, Dec 6, 2016 at 10:53 AM, Mohit Anchlia 
>> wrote:
>>
>> I have a consumer polling a topic of Kafka 0.10. Even though the topic has
>>> messages the consumer poll is not fetching the message. The thread dump
>>> reveals:
>>>
>>> "main" #1 prio=5 os_prio=0 tid=0x7f3ba4008800 nid=0x798 runnable
>>> [0x7f3baa6c3000]
>>>java.lang.Thread.State: RUNNABLE
>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.
>>> java:93)
>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>>> - locked <0x0006c6d1f8b8> (a sun.nio.ch.Util$3)
>>> - locked <0x0006c6d1f8a8> (a java.util.Collections$
>>> UnmodifiableSet)
>>> - locked <0x0006c6d1f0b8> (a sun.nio.ch.EPollSelectorImpl)
>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>>> at org.apache.kafka.common.network.Selector.select(
>>> Selector.java:470)
>>> at org.apache.kafka.common.network.Selector.poll(
>>> Selector.java:286)
>>> at org.apache.kafka.clients.NetworkClient.poll(
>>> NetworkClient.java:260)
>>> at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
>>> - locked <0x0006c6d1eff8> (a org.apache.kafka.clients.
>>> consumer.internals.ConsumerNetworkClient)
>>> at org.apache.kafka.clients.consumer.KafkaConsumer.
>>> pollOnce(KafkaConsumer.java:1031)
>>>
>>>
>>


Problem with multiple joins in one topology

2016-12-07 Thread Brian Krahmer

Hey guys,

  I'm having a hell of a time here.  I've worked for days trying to get 
this joining pipeline working.  I thought I had it working last week, 
but my jubilation was premature.  The point was to take data in from 
five different topics and merge them together to obtain one enriched 
event (output to compacted topic).  Can anybody spot what I'm doing 
wrong?  The ordering makes no difference.  For example, I've switched 
the locationInput and the vehicleReservedInput inputs in the leftJoin 
calls below, and I get the same results.  The location part of the 
enrichment works while the vehicleReserved part does not.  I can't even 
think of how to restructure the topology without resorting to building 
my own lower-level topology.


thanks,
brian


KTable fleetInput = 
builder.table(Serdes.String(),
vehicleFinderDataSerde, FLEET_TOPIC, 
VEHICLE_ENRICHER_FLEET_STORE);

...
fleetInput.print("fleetInput");
locationInput.print("locationInput");
vehicleReservedInput.print("vehicleReservedInput");
vehicleReleasedInput.print("vehicleReleasedInput");
vehicleUsageEndedInput.print("vehicleUsageEndedInput");

KTable mergeStepOne = 
fleetInput.leftJoin(locationInput, VehicleFinderData::merge);

mergeStepOne.print("mergeStepOne");
KTable mergeStepTwo = 
mergeStepOne.leftJoin(vehicleReleasedInput, VehicleFinderData::merge);

mergeStepTwo.print("mergeStepTwo");
KTable mergeStepThree = 
mergeStepTwo.leftJoin(vehicleUsageEndedInput, VehicleFinderData::merge);

mergeStepThree.print("mergeStepThree");
KTable mergeStepFour = 
mergeStepThree.leftJoin(vehicleReservedInput, VehicleFinderData::merge);

mergeStepFour.print("mergeStepFour");

** Generate a location event **

[locationInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Deserializing from topic VehicleEnricherFleetStore
Merge operation called
[mergeStepOne]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Merge operation called
[mergeStepTwo]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Merge operation called
[mergeStepThree]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Merge operation called
[mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)


** New event correctly serialized **

---

** Generate a vehicleReserved event **

[vehicleReservedInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped 
json value}<-null)

[mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , (null<-null)

** NO EVENT **



Re: Implementing custom key serializer

2016-12-07 Thread Guozhang Wang
I'm not sure why you observed that aggregation works ok if String typed key
is used. I think I agree with Radek that the problem comes from the value,
and here is my understanding:

1. The source stream read from the topic named "rtDetailLines" is in type

2. After the map call, the result stream named "rtRekey" is in type


3. Then in aggregateByKey, when repartitioning is auto executed (i.e. the
filter -> sink operators you saw in the stack trace), the default serdes
for type < AggKey, String> is used, and hence value Serializer
failed to serialize an RtDetailLogLine object, which matches the error
message

"Exception in thread "StreamThread-1" java.lang.ClassCastException:
com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
java.lang.String"

as well. Note that although we are only "grouping by key", we will use the
value serde anyways for repartitioning.


Anyways, I double checked the source code but cannot find any obvious bugs
that causes not using the default serdes.

Guozhang


On Tue, Dec 6, 2016 at 12:40 PM, Jon Yeargers 
wrote:

> Here's the solution (props to Damian G)
>
> JsonSerializer keySerializer = new JsonSerializer<>();
> JsonDeserializer keyDeserializer = new
> JsonDeserializer<>(AggKey.class);
> Serde keySerde = Serdes.serdeFrom(keySerializer, keyDeserializer);
>
> then for the aggregator call 'groupByKey(keySerde, prtRecordSerde)'.
>
> In the documentation where it says the 'no param' groupByKey will use the
> default serializers - this doesn't seem to be true.
>
> On Tue, Dec 6, 2016 at 12:28 PM, Jon Yeargers 
> wrote:
>
> > Hmm. That's odd as the aggregation works ok if I use a String value for
> > the key (and the corresponding String serde).
> >
> > This error only started occurring when I tried to substitute my 'custom'
> > key for the original String.
> >
> > On Tue, Dec 6, 2016 at 12:24 PM, Radek Gruchalski 
> > wrote:
> >
> >> Yeah, I knew that already, this part of the error:
> >>
> >> > > >>> > > org.apache.kafka.streams.processor.internals.
> >> > RecordCollector.send(
> >> > > >>> > RecordCollector.java:73)
> >>
> >> points to this line: https://github.com/apache/
> kafka/blob/0.10.1/streams/
> >> src/main/java/org/apache/kafka/streams/processor/
> >> internals/RecordCollector.java#L73
> >>
> >> which means that your error happens on the value, not the key.
> >>
> >> –
> >> Best regards,
> >> Radek Gruchalski
> >> ra...@gruchalski.com
> >>
> >>
> >> On December 6, 2016 at 9:18:53 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> >> wrote:
> >>
> >> 0.10.1.0
> >>
> >> On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski  >
> >> wrote:
> >>
> >> > Jon,
> >> >
> >> > Are you using 0.10.1 or 0.10.0.1?
> >> >
> >> > –
> >> > Best regards,
> >> > Radek Gruchalski
> >> > ra...@gruchalski.com
> >> >
> >> >
> >> > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com)
> >> > wrote:
> >> >
> >> > Hi Jon,
> >> >
> >> > At a glance the code looks ok, i.e, i believe the aggregate() should
> >> have
> >> > picked up the default Serde set in your StreamsConfig. However, you
> >> could
> >> > try adding the Serdes to the groupBy(..)
> >> >
> >> > i.e.,
> >> > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)
> >> >
> >> > Thanks,
> >> > Damian
> >> >
> >> > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers 
> >> wrote:
> >> >
> >> > > It's just a bunch of public 'int' and 'String' values. There's an
> >> empty
> >> > > constructor and a copy constructor.
> >> > >
> >> > > For functions I override 'equals' and the requirements for 'serde'
> >> > (close,
> >> > > configure, serializer and deserializer).
> >> > >
> >> > > @Override
> >> > > public Serializer serializer() {
> >> > > JsonSerializer jsonSerializer = new JsonSerializer<>();
> >> > > return jsonSerializer;
> >> > > }
> >> > >
> >> > > @Override
> >> > > public Deserializer deserializer() {
> >> > > JsonDeserializer jsonDeserializer = new
> >> > > JsonDeserializer<>();
> >> > > return jsonDeserializer;
> >> > > }
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > Which relates to:
> >> > >
> >> > > public class JsonSerializer implements Serializer {
> >> > >
> >> > > private Gson gson = new Gson();
> >> > >
> >> > > @Override
> >> > > public void configure(Map map, boolean b) {
> >> > >
> >> > > }
> >> > >
> >> > > @Override
> >> > > public byte[] serialize(String topic, T t) {
> >> > > return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> >> > > }
> >> > >
> >> > > @Override
> >> > > public void close() {
> >> > >
> >> > > }
> >> > > }
> >> > >
> >> > >
> >> > >
> >> > > public class JsonDeserializer implements Deserializer {
> >> > >
> >> > > private Gson gson = new Gson();
> >> > > private Class deserializedClass;
> >> > >
> >> > > public JsonDeserializer(Class deserializedClass) {
> >> > > this.deserializedClass = deserializedClass;
> >> > > }
> >> > >
> >> > > public JsonDeserializer() {
> >> > > }
> >> > >
> >> > > @Override
> >> > > @SuppressWarnings("unchecked")
> >> > > public void config

Re: reacting to a truststore change

2016-12-07 Thread Martin Gainty




From: Tuan Dang 
Sent: Wednesday, December 7, 2016 10:00 AM
To: users@kafka.apache.org
Subject: reacting to a truststore change

Hello all,

I'm working my way through Kafka 0.9 SSL/TLS authentication.

If I make a change to the truststore, either adding or removing a
certificate,  will Kafka automatically pick up the changes or would I need
to restart ?

My main issue is how to unauthorize a producer.  I've seen suggestions for
doing this by changing ACLs but I'd like to
stop at the connection level, either via a certificate revocation list or
via updating the truststore.

MG>deltas in kafka channel config(s) *could* update "running state" if 
ChannelBuilder architecture conformed to Observer pattern

https://en.wikipedia.org/wiki/Observer_pattern
Observer pattern - Wikipedia
en.wikipedia.org
The observer pattern is a software design pattern in which an object, called 
the subject, maintains a list of its dependents, called observers, and notifies 
them ...


/*unfortunately since ChannelBuilder current implementation is static the 
Observer Pattern has not been implemented yet */

public static ChannelBuilder create(SecurityProtocol securityProtocol, Mode 
mode, LoginType loginType, Map configs) {
ChannelBuilder channelBuilder;

switch (securityProtocol) {
case SSL:
requireNonNullMode(mode, securityProtocol);
channelBuilder = new SslChannelBuilder(mode);
break;
case SASL_SSL:
case SASL_PLAINTEXT:
requireNonNullMode(mode, securityProtocol);
if (loginType == null)
throw new IllegalArgumentException("`loginType` must be 
non-null if `securityProtocol` is `" + securityProtocol + "`");
channelBuilder = new SaslChannelBuilder(mode, loginType, 
securityProtocol);
break;
case PLAINTEXT:
case TRACE:
channelBuilder = new PlaintextChannelBuilder();
break;
default:
throw new IllegalArgumentException("Unexpected securityProtocol 
" + securityProtocol);
}

channelBuilder.configure(configs);
return channelBuilder;
}


MG>org.apache.kafka.common.network.ChannelBuilders loads SSLChannelBuilder 
statically so SSLConfig is only updated when jar is loaded

MG>conformance to Observer Pattern is already implemented in ESB

https://books.google.com/books?id=uemsBAAAQBAJ&pg=PA179&lpg=PA179&dq=ESB+Observer+pattern&source=bl&ots=aeshC1sBhF&sig=V9i8we6dYqUjx1vLZ0bO4a2Hxt8&hl=en&sa=X&ved=0ahUKEwj_u6rzuOLQAhUp_4MKHX0qAOcQ6AEIRDAG#v=onepage&q=ESB%20Observer%20pattern&f=false

MG>IMHO Refactoring kafka to implement Observer Pattern should be an easy 
refactor for 0.11.1x or latter version(s)

https://issues.apache.org/jira/browse/KAFKA/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel
Kafka - ASF 
JIRA
issues.apache.org
Atlassian JIRA Project Management Software (v6.3.4#6332-sha1:51bc225) About 
JIRA; Report a problem; Powered by a free Atlassian JIRA open source license 
for Apache ...




Thanks, Tuan

MG>let us know if you need any assistance processing this request


reacting to a truststore change

2016-12-07 Thread Tuan Dang
Hello all,

I'm working my way through Kafka 0.9 SSL/TLS authentication.

If I make a change to the truststore, either adding or removing a
certificate,  will Kafka automatically pick up the changes or would I need
to restart ?

My main issue is how to unauthorize a producer.  I've seen suggestions for
doing this by changing ACLs but I'd like to
stop at the connection level, either via a certificate revocation list or
via updating the truststore.

Thanks, Tuan


Re: accessing state-store ala WordCount example

2016-12-07 Thread Eno Thereska
Hi Jon,

The "/windowed" namel in the web server example is just an example name, it 
could have been called differently too. It is built however on the Interactive 
Query APIs which are fixed. In the example code I mentioned we see the 
implementation as shown below. Again, the web server code is just a reference 
implementation that you can feel free to change or call differently (as long as 
you call store.fetch etc correctly). 



@GET()
  @Path("/windowed/{storeName}/{key}/{from}/{to}")
  @Produces(MediaType.APPLICATION_JSON)
  public List windowedByKey(@PathParam("storeName") final String 
storeName,
  @PathParam("key") final String key,
  @PathParam("from") final Long from,
  @PathParam("to") final Long to) {

// Lookup the WindowStore with the provided storeName
final ReadOnlyWindowStore store = streams.store(storeName,
  
QueryableStoreTypes.windowStore());
if (store == null) {
  throw new NotFoundException();
}

// fetch the window results for the given key and time range
final WindowStoreIterator results = store.fetch(key, from, to);

final List windowResults = new ArrayList<>();
while (results.hasNext()) {
  final KeyValue next = results.next();
  // convert the result to have the window time and the key (for display 
purposes)
  windowResults.add(new KeyValueBean(key + "@" + next.key, next.value));
}
return windowResults;
  }


> On 7 Dec 2016, at 11:13, Jon Yeargers  wrote:
> 
> Im having trouble finding documentation on this new feature. Can you point
> me to anything?
> 
> Specifically on how to get available "from/to" values but more generally on
> how to use the "windowed" query.
> 
> On Wed, Dec 7, 2016 at 1:25 AM, Eno Thereska  wrote:
> 
>> Hi Jon,
>> 
>> This will be a windowed store. Have a look at the Jetty-server bits for
>> windowedByKey:
>> "/windowed/{storeName}/{key}/{from}/{to}"
>> 
>> Thanks
>> Eno
>> 
>>> On 6 Dec 2016, at 23:33, Jon Yeargers  wrote:
>>> 
>>> I copied out some of the WordCountInteractive
>>> > 1.x/kafka-streams/src/main/java/io/confluent/examples/
>> streams/interactivequeries/WordCountInteractiveQueriesExample.java>
>>> demo
>>> code to see how the REST access works. I have an aggregator
>>> 
>>> groupByKey().aggregate(LogLine::new,
>>>   new aggregate(),
>>>   TimeWindows.of(60 * 60 * 1000L),
>>>   collectorSerde, "agg_stream");
>>> 
>>> 
>>> I incorporated the Jetty-server bits from the sample. When I run it I can
>>> see results via the '/states/instances' entry point but nothing from the
>>> '/states/keyvalues/agg_stream/all'.
>>> 
>>> The aggregator is churning away so I'd assume the state store would have
>>> plenty of key/value pairs but it comes up empty.
>>> 
>>> What's the proper way to use this?
>> 
>> 



Re: Detecting when all the retries are expired for a message

2016-12-07 Thread Rajini Sivaram
Thanks Ismael, I hadn't seen the KIP. That does cover the issue described
here.

On Wed, Dec 7, 2016 at 10:39 AM, Ismael Juma  wrote:

> Note that Sumant has been working on a KIP proposal to make the producer
> timeout behaviour more intuitive:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 91+Provide+Intuitive+User+Timeouts+in+The+Producer
>
> Ismael
>
> On Wed, Dec 7, 2016 at 9:42 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com
> > wrote:
>
> > If you just want to test retries, you could restart Kafka while the
> > producer is running and you should see the producer retry while Kafka is
> > down/leader is being elected after Kafka restarts. If you specifically
> want
> > a TimeoutException to trigger all retries, I am not sure how you can. I
> > would suggest that you raise a JIRA since the current behaviour is not
> very
> > intuitive.
> >
> >
> > On Wed, Dec 7, 2016 at 6:51 AM, Mevada, Vatsal 
> > wrote:
> >
> > > @Asaf
> > >
> > >
> > >
> > > Do I need to raise new bug for this?
> > >
> > >
> > >
> > > @Rajini
> > >
> > >
> > >
> > > Please suggest some the configuration with which retries should work
> > > according to you. The code is already there in the mail chain. I am
> > adding
> > > it here again:
> > >
> > >
> > >
> > > public void produce(String topicName, String filePath, String
> > > bootstrapServers, String encoding) {
> > >
> > > try (BufferedReader bf = getBufferedReader(filePath,
> > > encoding);
> > >
> > > KafkaProducer producer
> =
> > > initKafkaProducer(bootstrapServers)) {
> > >
> > > String line;
> > >
> > > while ((line = bf.readLine()) != null)
> {
> > >
> > > producer.send(new
> > > ProducerRecord<>(topicName, line), (metadata, e) -> {
> > >
> > > if (e
> !=
> > > null) {
> > >
> > >
> > >   e.printStackTrace();
> > >
> > > }
> > >
> > > });
> > >
> > > }
> > >
> > > producer.flush();
> > >
> > > } catch (IOException e) {
> > >
> > > Throwables.propagate(e);
> > >
> > > }
> > >
> > > }
> > >
> > >
> > >
> > > private static KafkaProducer initKafkaProducer(String
> > > bootstrapServer) {
> > >
> > > Properties properties = new Properties();
> > >
> > > properties.put("bootstrap.servers", bootstrapServer);
> > >
> > > properties.put("key.serializer",
> StringSerializer.class.
> > > getCanonicalName());
> > >
> > > properties.put("value.serializer",StringSerializer.
> > > class.getCanonicalName());
> > >
> > > properties.put("acks", "-1");
> > >
> > > properties.put("retries", 5);
> > >
> > > properties.put("request.timeout.ms", 1);
> > >
> > > return new KafkaProducer<>(properties);
> > >
> > > }
> > >
> > >
> > >
> > > private BufferedReader getBufferedReader(String filePath, String
> > encoding)
> > > throws UnsupportedEncodingException, FileNotFoundException {
> > >
> > > return new BufferedReader(new InputStreamReader(new
> > > FileInputStream(filePath), Optional.ofNullable(encoding).
> > > orElse("UTF-8")));
> > >
> > > }
> > >
> > >
> > >
> > > Regards,
> > >
> > > Vatsal
> > >
> > >
> > >
> > > -Original Message-
> > > From: Rajini Sivaram [mailto:rajinisiva...@googlemail.com]
> > > Sent: 06 December 2016 17:27
> > > To: users@kafka.apache.org
> > > Subject: Re: Detecting when all the retries are expired for a message
> > >
> > >
> > >
> > > I believe batches in RecordAccumulator are expired after
> > > request.timeout.ms, so they wouldn't get retried in this case. I think
> > > the config options are quite confusing, making it hard to figure out
> the
> > > behavior without looking into the code.
> > >
> > >
> > >
> > > On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika  > > > wrote:
> > >
> > >
> > >
> > > > Vatsal:
> > >
> > > >
> > >
> > > > I don't think they merged the fix for this bug (retries doesn't work)
> > >
> > > > in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
> > >
> > > >
> > >
> > > >
> > >
> > > > On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal
> > >
> > > > mailto:mev...@sky.optymyze.com>>
> > >
> > > > wrote:
> > >
> > > >
> > >
> > > > > Hello,
> > >
> > > > >
> > >
> > > > > Bumping up this thread in case anyone of you have any say on this
> > > issue.
> > >
> > > > >
> > >
> > > > > Regards,
> > >
> > > > > Vatsal
> > >
> > > > >
> > >
> > > > > -Original Message-
> > >
> > > > > From: Mevada, Vatsal
> > >
> > > > > Sent: 02 December 2016 16:16
> > >
> > > > > To: Kafka 

Re: accessing state-store ala WordCount example

2016-12-07 Thread Jon Yeargers
Im having trouble finding documentation on this new feature. Can you point
me to anything?

Specifically on how to get available "from/to" values but more generally on
how to use the "windowed" query.

On Wed, Dec 7, 2016 at 1:25 AM, Eno Thereska  wrote:

> Hi Jon,
>
> This will be a windowed store. Have a look at the Jetty-server bits for
> windowedByKey:
> "/windowed/{storeName}/{key}/{from}/{to}"
>
> Thanks
> Eno
>
> > On 6 Dec 2016, at 23:33, Jon Yeargers  wrote:
> >
> > I copied out some of the WordCountInteractive
> >  1.x/kafka-streams/src/main/java/io/confluent/examples/
> streams/interactivequeries/WordCountInteractiveQueriesExample.java>
> > demo
> > code to see how the REST access works. I have an aggregator
> >
> > groupByKey().aggregate(LogLine::new,
> >new aggregate(),
> >TimeWindows.of(60 * 60 * 1000L),
> >collectorSerde, "agg_stream");
> >
> >
> > I incorporated the Jetty-server bits from the sample. When I run it I can
> > see results via the '/states/instances' entry point but nothing from the
> > '/states/keyvalues/agg_stream/all'.
> >
> > The aggregator is churning away so I'd assume the state store would have
> > plenty of key/value pairs but it comes up empty.
> >
> > What's the proper way to use this?
>
>


Re: Detecting when all the retries are expired for a message

2016-12-07 Thread Ismael Juma
Note that Sumant has been working on a KIP proposal to make the producer
timeout behaviour more intuitive:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer

Ismael

On Wed, Dec 7, 2016 at 9:42 AM, Rajini Sivaram  wrote:

> If you just want to test retries, you could restart Kafka while the
> producer is running and you should see the producer retry while Kafka is
> down/leader is being elected after Kafka restarts. If you specifically want
> a TimeoutException to trigger all retries, I am not sure how you can. I
> would suggest that you raise a JIRA since the current behaviour is not very
> intuitive.
>
>
> On Wed, Dec 7, 2016 at 6:51 AM, Mevada, Vatsal 
> wrote:
>
> > @Asaf
> >
> >
> >
> > Do I need to raise new bug for this?
> >
> >
> >
> > @Rajini
> >
> >
> >
> > Please suggest some the configuration with which retries should work
> > according to you. The code is already there in the mail chain. I am
> adding
> > it here again:
> >
> >
> >
> > public void produce(String topicName, String filePath, String
> > bootstrapServers, String encoding) {
> >
> > try (BufferedReader bf = getBufferedReader(filePath,
> > encoding);
> >
> > KafkaProducer producer =
> > initKafkaProducer(bootstrapServers)) {
> >
> > String line;
> >
> > while ((line = bf.readLine()) != null) {
> >
> > producer.send(new
> > ProducerRecord<>(topicName, line), (metadata, e) -> {
> >
> > if (e !=
> > null) {
> >
> >
> >   e.printStackTrace();
> >
> > }
> >
> > });
> >
> > }
> >
> > producer.flush();
> >
> > } catch (IOException e) {
> >
> > Throwables.propagate(e);
> >
> > }
> >
> > }
> >
> >
> >
> > private static KafkaProducer initKafkaProducer(String
> > bootstrapServer) {
> >
> > Properties properties = new Properties();
> >
> > properties.put("bootstrap.servers", bootstrapServer);
> >
> > properties.put("key.serializer", StringSerializer.class.
> > getCanonicalName());
> >
> > properties.put("value.serializer",StringSerializer.
> > class.getCanonicalName());
> >
> > properties.put("acks", "-1");
> >
> > properties.put("retries", 5);
> >
> > properties.put("request.timeout.ms", 1);
> >
> > return new KafkaProducer<>(properties);
> >
> > }
> >
> >
> >
> > private BufferedReader getBufferedReader(String filePath, String
> encoding)
> > throws UnsupportedEncodingException, FileNotFoundException {
> >
> > return new BufferedReader(new InputStreamReader(new
> > FileInputStream(filePath), Optional.ofNullable(encoding).
> > orElse("UTF-8")));
> >
> > }
> >
> >
> >
> > Regards,
> >
> > Vatsal
> >
> >
> >
> > -Original Message-
> > From: Rajini Sivaram [mailto:rajinisiva...@googlemail.com]
> > Sent: 06 December 2016 17:27
> > To: users@kafka.apache.org
> > Subject: Re: Detecting when all the retries are expired for a message
> >
> >
> >
> > I believe batches in RecordAccumulator are expired after
> > request.timeout.ms, so they wouldn't get retried in this case. I think
> > the config options are quite confusing, making it hard to figure out the
> > behavior without looking into the code.
> >
> >
> >
> > On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika  > > wrote:
> >
> >
> >
> > > Vatsal:
> >
> > >
> >
> > > I don't think they merged the fix for this bug (retries doesn't work)
> >
> > > in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
> >
> > >
> >
> > >
> >
> > > On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal
> >
> > > mailto:mev...@sky.optymyze.com>>
> >
> > > wrote:
> >
> > >
> >
> > > > Hello,
> >
> > > >
> >
> > > > Bumping up this thread in case anyone of you have any say on this
> > issue.
> >
> > > >
> >
> > > > Regards,
> >
> > > > Vatsal
> >
> > > >
> >
> > > > -Original Message-
> >
> > > > From: Mevada, Vatsal
> >
> > > > Sent: 02 December 2016 16:16
> >
> > > > To: Kafka Users  users@kafka.apache.org>
> > >
> >
> > > > Subject: RE: Detecting when all the retries are expired for a
> >
> > > > message
> >
> > > >
> >
> > > > I executed the same producer code for a single record file with
> >
> > > > following
> >
> > > > config:
> >
> > > >
> >
> > > > properties.put("bootstrap.servers", bootstrapServer);
> >
> > > > properties.put("key.serializer",
> >
> > > > StringSerializer.class.getCanonicalName());
> >
> > > > properties.put("value.serializer",
> >
> > > > StringSerializer.class.getCan

Re: Detecting when all the retries are expired for a message

2016-12-07 Thread Ismael Juma
Hi Asaf,

That PR is for the backport to 0.9.0.x, the original change was merged to
trunk and is in 0.10.x.x.

Ismael

On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika  wrote:

> Vatsal:
>
> I don't think they merged the fix for this bug (retries doesn't work) in
> 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
>
>
> On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal 
> wrote:
>
> > Hello,
> >
> > Bumping up this thread in case anyone of you have any say on this issue.
> >
> > Regards,
> > Vatsal
> >
> > -Original Message-
> > From: Mevada, Vatsal
> > Sent: 02 December 2016 16:16
> > To: Kafka Users 
> > Subject: RE: Detecting when all the retries are expired for a message
> >
> > I executed the same producer code for a single record file with following
> > config:
> >
> > properties.put("bootstrap.servers", bootstrapServer);
> > properties.put("key.serializer",
> > StringSerializer.class.getCanonicalName());
> > properties.put("value.serializer",
> > StringSerializer.class.getCanonicalName());
> > properties.put("acks", "-1");
> > properties.put("retries", 5);
> > properties.put("request.timeout.ms", 1);
> >
> > I have kept request.timeout.ms=1 to make sure that message delivery will
> > fail with TimeoutException. Since the retries are 5 then the program
> > should take at-least 5 ms (50 seconds) to complete for single record.
> > However the program is completing almost instantly with only one callback
> > with TimeoutException. I suspect that producer is not going for any
> > retries. Or am I missing something in my code?
> >
> > My Kafka version is 0.10.0.1.
> >
> > Regards,
> > Vatsal
> > Am I missing any configuration or
> > -Original Message-
> > From: Ismael Juma [mailto:isma...@gmail.com]
> > Sent: 02 December 2016 13:30
> > To: Kafka Users 
> > Subject: RE: Detecting when all the retries are expired for a message
> >
> > The callback is called after the retries have been exhausted.
> >
> > Ismael
> >
> > On 2 Dec 2016 3:34 am, "Mevada, Vatsal"  wrote:
> >
> > > @Ismael:
> > >
> > > I can handle TimeoutException in the callback. However as per the
> > > documentation of Callback(link: https://kafka.apache.org/0100/
> > > javadoc/org/apache/kafka/clients/producer/Callback.html),
> > > TimeoutException is a retriable exception and it says that it "may be
> > > covered by increasing #.retries". So even if I get TimeoutException in
> > > callback, wouldn't it try to send message again until all the retries
> > > are done? Would it be safe to assume that message delivery is failed
> > > permanently just by encountering TimeoutException in callback?
> > >
> > > Here is a snippet from above mentioned documentation:
> > > "exception - The exception thrown during processing of this record.
> > > Null if no error occurred. Possible thrown exceptions include:
> > > Non-Retriable exceptions (fatal, the message will never be sent):
> > > InvalidTopicException OffsetMetadataTooLargeException
> > > RecordBatchTooLargeException RecordTooLargeException
> > > UnknownServerException Retriable exceptions (transient, may be covered
> > > by increasing #.retries): CorruptRecordException
> > > InvalidMetadataException NotEnoughReplicasAfterAppendException
> > > NotEnoughReplicasException OffsetOutOfRangeException TimeoutException
> > > UnknownTopicOrPartitionException"
> > >
> > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
> > > face the issue that you are mentioning. I mentioned documentation link
> > > of 0.9 by mistake.
> > >
> > > Regards,
> > > Vatsal
> > > -Original Message-
> > > From: Asaf Mesika [mailto:asaf.mes...@gmail.com]
> > > Sent: 02 December 2016 00:32
> > > To: Kafka Users 
> > > Subject: Re: Detecting when all the retries are expired for a message
> > >
> > > There's a critical bug in that section that has only been fixed in
> > > 0.9.0.2 which has not been release yet. Without the fix it doesn't
> > really retry.
> > > I forked the kafka repo, applied the fix, built it and placed it in
> > > our own Nexus Maven repository until 0.9.0.2 will be released.
> > >
> > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
> > >
> > > Feel free to use it.
> > >
> > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:
> > >
> > > > The callback should give you what you are asking for. Has it not
> > > > worked as you expect when you tried it?
> > > >
> > > > Ismael
> > > >
> > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > > > 
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > >
> > > > >
> > > > > I am reading a file and dumping each record on Kafka. Here is my
> > > > > producer
> > > > > code:
> > > > >
> > > > >
> > > > >
> > > > > public void produce(String topicName, String filePath, String
> > > > > bootstrapServers, String encoding) {
> > > > >
> > > > > try (BufferedReader bf =
> > > > > getBufferedReader(filePath, encoding);
> > > > >
> > > > >   

Re: Consumer poll - no results

2016-12-07 Thread Harald Kirsch

Have you defined

auto.offset.reset: earliest

or otherwise made sure (KafkaConsumer.position()) that the consumer does 
not just wait for *new* messages to arrive?


Harald.


On 06.12.2016 20:11, Mohit Anchlia wrote:

I see this message in the logs:

[2016-12-06 13:54:16,586] INFO [GroupCoordinator 0]: Preparing to
restabilize group DemoConsumer with old generation 3
(kafka.coordinator.GroupCoordinator)



On Tue, Dec 6, 2016 at 10:53 AM, Mohit Anchlia 
wrote:


I have a consumer polling a topic of Kafka 0.10. Even though the topic has
messages the consumer poll is not fetching the message. The thread dump
reveals:

"main" #1 prio=5 os_prio=0 tid=0x7f3ba4008800 nid=0x798 runnable
[0x7f3baa6c3000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.
java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0006c6d1f8b8> (a sun.nio.ch.Util$3)
- locked <0x0006c6d1f8a8> (a java.util.Collections$
UnmodifiableSet)
- locked <0x0006c6d1f0b8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(
Selector.java:470)
at org.apache.kafka.common.network.Selector.poll(
Selector.java:286)
at org.apache.kafka.clients.NetworkClient.poll(
NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.
ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
- locked <0x0006c6d1eff8> (a org.apache.kafka.clients.
consumer.internals.ConsumerNetworkClient)
at org.apache.kafka.clients.consumer.KafkaConsumer.
pollOnce(KafkaConsumer.java:1031)





Re: Detecting when all the retries are expired for a message

2016-12-07 Thread Rajini Sivaram
If you just want to test retries, you could restart Kafka while the
producer is running and you should see the producer retry while Kafka is
down/leader is being elected after Kafka restarts. If you specifically want
a TimeoutException to trigger all retries, I am not sure how you can. I
would suggest that you raise a JIRA since the current behaviour is not very
intuitive.


On Wed, Dec 7, 2016 at 6:51 AM, Mevada, Vatsal 
wrote:

> @Asaf
>
>
>
> Do I need to raise new bug for this?
>
>
>
> @Rajini
>
>
>
> Please suggest some the configuration with which retries should work
> according to you. The code is already there in the mail chain. I am adding
> it here again:
>
>
>
> public void produce(String topicName, String filePath, String
> bootstrapServers, String encoding) {
>
> try (BufferedReader bf = getBufferedReader(filePath,
> encoding);
>
> KafkaProducer producer =
> initKafkaProducer(bootstrapServers)) {
>
> String line;
>
> while ((line = bf.readLine()) != null) {
>
> producer.send(new
> ProducerRecord<>(topicName, line), (metadata, e) -> {
>
> if (e !=
> null) {
>
>
>   e.printStackTrace();
>
> }
>
> });
>
> }
>
> producer.flush();
>
> } catch (IOException e) {
>
> Throwables.propagate(e);
>
> }
>
> }
>
>
>
> private static KafkaProducer initKafkaProducer(String
> bootstrapServer) {
>
> Properties properties = new Properties();
>
> properties.put("bootstrap.servers", bootstrapServer);
>
> properties.put("key.serializer", StringSerializer.class.
> getCanonicalName());
>
> properties.put("value.serializer",StringSerializer.
> class.getCanonicalName());
>
> properties.put("acks", "-1");
>
> properties.put("retries", 5);
>
> properties.put("request.timeout.ms", 1);
>
> return new KafkaProducer<>(properties);
>
> }
>
>
>
> private BufferedReader getBufferedReader(String filePath, String encoding)
> throws UnsupportedEncodingException, FileNotFoundException {
>
> return new BufferedReader(new InputStreamReader(new
> FileInputStream(filePath), Optional.ofNullable(encoding).
> orElse("UTF-8")));
>
> }
>
>
>
> Regards,
>
> Vatsal
>
>
>
> -Original Message-
> From: Rajini Sivaram [mailto:rajinisiva...@googlemail.com]
> Sent: 06 December 2016 17:27
> To: users@kafka.apache.org
> Subject: Re: Detecting when all the retries are expired for a message
>
>
>
> I believe batches in RecordAccumulator are expired after
> request.timeout.ms, so they wouldn't get retried in this case. I think
> the config options are quite confusing, making it hard to figure out the
> behavior without looking into the code.
>
>
>
> On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika  > wrote:
>
>
>
> > Vatsal:
>
> >
>
> > I don't think they merged the fix for this bug (retries doesn't work)
>
> > in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
>
> >
>
> >
>
> > On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal
>
> > mailto:mev...@sky.optymyze.com>>
>
> > wrote:
>
> >
>
> > > Hello,
>
> > >
>
> > > Bumping up this thread in case anyone of you have any say on this
> issue.
>
> > >
>
> > > Regards,
>
> > > Vatsal
>
> > >
>
> > > -Original Message-
>
> > > From: Mevada, Vatsal
>
> > > Sent: 02 December 2016 16:16
>
> > > To: Kafka Users mailto:users@kafka.apache.org>
> >
>
> > > Subject: RE: Detecting when all the retries are expired for a
>
> > > message
>
> > >
>
> > > I executed the same producer code for a single record file with
>
> > > following
>
> > > config:
>
> > >
>
> > > properties.put("bootstrap.servers", bootstrapServer);
>
> > > properties.put("key.serializer",
>
> > > StringSerializer.class.getCanonicalName());
>
> > > properties.put("value.serializer",
>
> > > StringSerializer.class.getCanonicalName());
>
> > > properties.put("acks", "-1");
>
> > > properties.put("retries", 5);
>
> > > properties.put("request.timeout.ms", 1);
>
> > >
>
> > > I have kept request.timeout.ms=1 to make sure that message delivery
>
> > > will fail with TimeoutException. Since the retries are 5 then
>
> > > the program should take at-least 5 ms (50 seconds) to complete for
> single record.
>
> > > However the program is completing almost instantly with only one
>
> > > callback with TimeoutException. I suspect that producer is not going
>
> > > for any retries. Or am I missing something in my code?
>
> > >
>
> > > My Kafka version i

Re: Best approach to frequently restarting consumer process

2016-12-07 Thread Harald Kirsch
With 'restart' I mean a 'let it crash' setup (as promoted by Erlang and 
Akka, e.g. 
http://doc.akka.io/docs/akka/snapshot/intro/what-is-akka.html). The 
consumer gets in trouble due to an OOM or a runaway computation or 
whatever that we want to preempt somehow. It crashes or gets killed 
externally.


So whether close() is called or not in the dying process, I don't know. 
But clearly the subscribe is called after a restart.


I understand that we are out of luck with this. We would have to 
separate the crashing part out into a different operating system 
process, but must keep the consumer running all time. :-(


Thanks for the insight
Harald

On 06.12.2016 19:26, Gwen Shapira wrote:

Can you clarify what you mean by "restart"? If you call
consumer.close() and consumer.subscribe() you will definitely trigger
a rebalance.

It doesn't matter if its "same consumer knocking", we already
rebalance when you call consumer.close().

Since we want both consumer.close() and consumer.subscribe() to cause
rebalance immediately (and not wait for heartbeat), I don't think
we'll be changing their behavior.

Depending on why consumers need to restart, I'm wondering if you can
restart other threads in your application but keep the consumer up and
running to avoid the rebalances.

On Tue, Dec 6, 2016 at 7:18 AM, Harald Kirsch  wrote:

We have consumer processes which need to restart frequently, say, every 5
minutes. We have 10 of them so we are facing two restarts every minute on
average.

1) It seems that nearly every time a consumer restarts  the group is
rebalanced. Even if the restart takes less than the heartbeat interval.

2) My guess is that the group manager just cannot know that the same
consumer is knocking at the door again.

Are my suspicions (1) and (2) correct? Is there a chance to fix this such
that a restart within the heartbeat interval does not lead to a re-balance?
Would a well defined client.id help?

Regards
Harald







Re: accessing state-store ala WordCount example

2016-12-07 Thread Eno Thereska
Hi Jon,

This will be a windowed store. Have a look at the Jetty-server bits for 
windowedByKey:
"/windowed/{storeName}/{key}/{from}/{to}"

Thanks
Eno

> On 6 Dec 2016, at 23:33, Jon Yeargers  wrote:
> 
> I copied out some of the WordCountInteractive
> 
> demo
> code to see how the REST access works. I have an aggregator
> 
> groupByKey().aggregate(LogLine::new,
>new aggregate(),
>TimeWindows.of(60 * 60 * 1000L),
>collectorSerde, "agg_stream");
> 
> 
> I incorporated the Jetty-server bits from the sample. When I run it I can
> see results via the '/states/instances' entry point but nothing from the
> '/states/keyvalues/agg_stream/all'.
> 
> The aggregator is churning away so I'd assume the state store would have
> plenty of key/value pairs but it comes up empty.
> 
> What's the proper way to use this?