RE: Building API to make Kafka reactive

2016-06-28 Thread Lohith Samaga M
Hi Shekar,
Alternatively, you could make each stage of your pipeline to write to a 
Cassandra (or other DB) and your API will read from it. With Cassandra TTL, the 
row will be deleted after TTL is passed. No manual cleanup is required.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga



-Original Message-
From: Shekar Tippur [mailto:ctip...@gmail.com] 
Sent: Wednesday, June 29, 2016 12.10
To: users
Subject: Building API to make Kafka reactive

I am looking at building a reactive api on top of Kafka.
This API produces event to Kafka topic. I want to add a unique session id into 
the payload.
The data gets transformed as it goes through different stages of a pipeline. I 
want to specify a final topic where I want the api to know that the processing 
was successful.
The API should give different status at each part of the pipeline.
At the ingestion, the API responds with "submitted"
During the progression, the API returns "in progress"
After successful completion, the API returns "Success"

Couple of questions:
1. Is this feasible?
2. I was looking at project reactor (https://projectreactor.io) where the docs 
talk about event bus. I wanted to see if I can implement a consumer that points 
to the "end" topic and throws an event into the event bus.
Since I would know the session ID, I can process the request accordingly.

Appreciate your inputs.

- Shekar
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


Building API to make Kafka reactive

2016-06-28 Thread Shekar Tippur
I am looking at building a reactive api on top of Kafka.
This API produces event to Kafka topic. I want to add a unique session id
into the payload.
The data gets transformed as it goes through different stages of a
pipeline. I want to specify a final topic where I want the api to know that
the processing was successful.
The API should give different status at each part of the pipeline.
At the ingestion, the API responds with "submitted"
During the progression, the API returns "in progress"
After successful completion, the API returns "Success"

Couple of questions:
1. Is this feasible?
2. I was looking at project reactor (https://projectreactor.io) where the
docs talk about event bus. I wanted to see if I can implement a consumer
that points to the "end" topic and throws an event into the event bus.
Since I would know the session ID, I can process the request accordingly.

Appreciate your inputs.

- Shekar


Re: Question about bootstrap processing in KafkaStreams.

2016-06-28 Thread Gwen Shapira
Upgrade :)

On Tue, Jun 28, 2016 at 6:49 PM, Rohit Valsakumar  wrote:
> Hi Jay,
>
> Thanks for the reply.
>
> Unfortunately in our case due to legacy reasons we are using
> WallclockTimestampExtractor in the application for all the streams and the
> existing messages in the stream probably won¹t have timestamps as they are
> being produced by legacy clients. So the events are being ingested with
> processing times and it may not be able to synchronize based on the
> message timestamps. What do you recommend for this scenario?
>
> Rohit
>
> On 6/28/16, 5:18 PM, "Jay Kreps"  wrote:
>
>>I think you may get this for free as Kafka Streams attempts to align
>>consumption across different topics/partitions by the timestamp in the
>>messages. So in a case where you are starting a job fresh and it has a
>>database changelog to consume and a event stream to consume, it will
>>attempt to keep the Ktable at the "time" the event stream is at. This is
>>only a heuristic, of course, since messages are necessarily strongly
>>ordered by time. I think this is likely mostly the same but slightly
>>better
>>than the bootstrap usage in Samza but also covers other cases of
>>alignment.
>>
>>If you want more control you can override the timestamp extractor that
>>associates time and hence priority for the streams:
>>https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/processor/T
>>imestampExtractor.html
>>
>>-Jay
>>
>>On Tue, Jun 28, 2016 at 2:49 PM, Rohit Valsakumar 
>>wrote:
>>
>>> Hi all,
>>>
>>> Is there a way to consume all the contents of a kafka topic into a
>>>KTable
>>> before doing a left join with another Kstream?
>>>
>>> I am looking at something that simulates a bootstrap topic in a Samza
>>>job.
>>>
>>> Thanks,
>>> Rohit Valsakumar
>>>
>>> 
>>>
>>> This email and any attachments may contain confidential and privileged
>>> material for the sole use of the intended recipient. Any review,
>>>copying,
>>> or distribution of this email (or any attachments) by others is
>>>prohibited.
>>> If you are not the intended recipient, please contact the sender
>>> immediately and permanently delete this email and any attachments. No
>>> employee or agent of TiVo Inc. is authorized to conclude any binding
>>> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>>> Inc. may only be made by a signed written agreement.
>>>
>
>
> 
>
> This email and any attachments may contain confidential and privileged 
> material for the sole use of the intended recipient. Any review, copying, or 
> distribution of this email (or any attachments) by others is prohibited. If 
> you are not the intended recipient, please contact the sender immediately and 
> permanently delete this email and any attachments. No employee or agent of 
> TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo 
> Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed 
> written agreement.


Re: Question about bootstrap processing in KafkaStreams.

2016-06-28 Thread Rohit Valsakumar
Hi Jay,

Thanks for the reply.

Unfortunately in our case due to legacy reasons we are using
WallclockTimestampExtractor in the application for all the streams and the
existing messages in the stream probably won¹t have timestamps as they are
being produced by legacy clients. So the events are being ingested with
processing times and it may not be able to synchronize based on the
message timestamps. What do you recommend for this scenario?

Rohit

On 6/28/16, 5:18 PM, "Jay Kreps"  wrote:

>I think you may get this for free as Kafka Streams attempts to align
>consumption across different topics/partitions by the timestamp in the
>messages. So in a case where you are starting a job fresh and it has a
>database changelog to consume and a event stream to consume, it will
>attempt to keep the Ktable at the "time" the event stream is at. This is
>only a heuristic, of course, since messages are necessarily strongly
>ordered by time. I think this is likely mostly the same but slightly
>better
>than the bootstrap usage in Samza but also covers other cases of
>alignment.
>
>If you want more control you can override the timestamp extractor that
>associates time and hence priority for the streams:
>https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/processor/T
>imestampExtractor.html
>
>-Jay
>
>On Tue, Jun 28, 2016 at 2:49 PM, Rohit Valsakumar 
>wrote:
>
>> Hi all,
>>
>> Is there a way to consume all the contents of a kafka topic into a
>>KTable
>> before doing a left join with another Kstream?
>>
>> I am looking at something that simulates a bootstrap topic in a Samza
>>job.
>>
>> Thanks,
>> Rohit Valsakumar
>>
>> 
>>
>> This email and any attachments may contain confidential and privileged
>> material for the sole use of the intended recipient. Any review,
>>copying,
>> or distribution of this email (or any attachments) by others is
>>prohibited.
>> If you are not the intended recipient, please contact the sender
>> immediately and permanently delete this email and any attachments. No
>> employee or agent of TiVo Inc. is authorized to conclude any binding
>> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>> Inc. may only be made by a signed written agreement.
>>




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: kafka + autoscaling groups fuckery

2016-06-28 Thread Gwen Shapira
Charity,

1. Nothing you do seems crazy to me. Kafka should be able to work with
auto-scaling and we should be able to fix the issues you are running
into.

There are few things you should be careful about when using the method
you described though:
1.1 Your life may be a bit simpler if you have a way of starting a new
broker with the same ID as the old one - this means it will
automatically pick up the old replicas and you won't need to
rebalance. Makes life slightly easier in some cases.
1.2 Careful not too rebalance too many partitions at once - you only
have so much bandwidth and currently Kafka will not throttle
rebalancing traffic.

2. I think your rebalance script is not rebalancing the offsets topic?
It still has a replica on broker 1002. You have two good replicas, so
you are no where near disaster, but make sure you get this working
too.

3. From the logs, it looks like something a bit more complex happened...
You started with brokers 1001,1002 and 1003. Until around 00:08 or so,
they are all gone. Then 1005,1006,1007 and 1008 show up, but they
still have no replicas. Then you get 1001, 1003, 1004 and 1005? and
then it moves to 1001, 1003, 1004 and 1009? I'm not sure I managed to
piece this together correctly (we need better log-extraction tools for
sure...), but it looks like we had plenty of opportunities for things
to go wrong :)

4. We have a known race condition where two leader elections in close
proximity get cause a consumer to accidentally get ahead.
One culprit is "auto.leader.rebalance.auto=enable" - this can trigger
leader election at bad timing (see:
https://issues.apache.org/jira/browse/KAFKA-3670) which can lead to
the loss of the last offset after consumers saw it. In cases where the
admin controls rebalances, we often turn it off. You can trigger
rebalances based on your knowledge without Kafka automatically doing
extra rebalancing.

Another culprit can be "unclean.leader.election.enable", but you don't
have that :)

A common work-around is to configure the consumer to handle "offset
out of range" exception by jumping to the last offset available in the
log. This is the behavior of the Java client, and it would have saved
your consumer here. Go client looks very low level, so I don't know
how easy it is to do that.

If I were you, I'd retest your ASG scripts without the auto leader
election - since your own scripts can / should handle that.

Hope this helps,

Gwen

On Tue, Jun 28, 2016 at 3:49 PM, Charity Majors  wrote:
> Hi there,
>
> I just finished implementing kafka + autoscaling groups in a way that made
> sense to me.  I have a _lot_ of experience with ASGs and various storage
> types but I'm a kafka noob (about 4-5 months of using in development and
> staging and pre-launch production).
>
> It seems to be working fine from the Kafka POV but causing troubling side
> effects elsewhere that I don't understand.  I don't know enough about Kafka
> to know if my implementation is just fundamentally flawed for some reason,
> or if so how and why.
>
> My process is basically this:
>
> - Terminate a node, or increment the size of the ASG by one.  (I'm not doing
> any graceful shutdowns because I don't want to rely on graceful shutdowns,
> and I'm not attempting to act upon more than one node at a time.  Planning
> on doing a ZK lock or something later to enforce one process at a time, if I
> can work the major kinks out.)
>
> - Firstboot script, which runs on all hosts from rc.init.  (We run ASGs for
> *everything.)  It infers things like the chef role, environment, cluster
> name, etc, registers DNS, bootstraps and runs chef-client, etc.  For storage
> nodes, it formats and mounts a PIOPS volume under the right mount point, or
> just remounts the volume if it already contains data.  Etc.
>
> - Run a balancing script from firstboot on kafka nodes.  It checks to see
> how many brokers there are and what their ids are, and checks for any
> underbalanced partitions with less than 3 ISRs.  Then we generate a new
> assignment file for rebalancing partitions, and execute it.  We watch on the
> host for all the partitions to finish rebalancing, then complete.
>
> - So far so good.  I have repeatedly killed kafka nodes and had them come
> up, rebalance the cluster, and everything on the kafka side looks healthy.
> All the partitions have the correct number of ISRs, etc.
>
> But after doing this, we have repeatedly gotten into a state where consumers
> that are pulling off the kafka partitions enter a weird state where their
> last known offset is *ahead* of the last known offset for that partition,
> and we can't recover from it.
>
> A example.  Last night I terminated ... I think it was broker 1002 or 1005,
> and it came back up as broker 1009.  It rebalanced on boot, everything
> looked good from the kafka side.  This morning we noticed that the storage
> node that maps to partition 5 has been broken for like 22 hours, it thinks
> the next offset is too far ahead / out of bounds so stoppe

Re: Question about bootstrap processing in KafkaStreams.

2016-06-28 Thread Jay Kreps
I think you may get this for free as Kafka Streams attempts to align
consumption across different topics/partitions by the timestamp in the
messages. So in a case where you are starting a job fresh and it has a
database changelog to consume and a event stream to consume, it will
attempt to keep the Ktable at the "time" the event stream is at. This is
only a heuristic, of course, since messages are necessarily strongly
ordered by time. I think this is likely mostly the same but slightly better
than the bootstrap usage in Samza but also covers other cases of alignment.

If you want more control you can override the timestamp extractor that
associates time and hence priority for the streams:
https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html

-Jay

On Tue, Jun 28, 2016 at 2:49 PM, Rohit Valsakumar 
wrote:

> Hi all,
>
> Is there a way to consume all the contents of a kafka topic into a KTable
> before doing a left join with another Kstream?
>
> I am looking at something that simulates a bootstrap topic in a Samza job.
>
> Thanks,
> Rohit Valsakumar
>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>


Re: kafka + autoscaling groups fuckery

2016-06-28 Thread Charity Majors
Reasons.

Investigated it thoroughly, believe me.  Some of the limitations that
Kinesis uses to protect itself are non starters for us.

Forgot to mention, we are using 0.9.0.1-0.



On Tue, Jun 28, 2016 at 3:56 PM, Pradeep Gollakota 
wrote:

> Just out of curiosity, if you guys are in AWS for everything, why not use
> Kinesis?
>
> On Tue, Jun 28, 2016 at 3:49 PM, Charity Majors  wrote:
>
> > Hi there,
> >
> > I just finished implementing kafka + autoscaling groups in a way that
> made
> > sense to me.  I have a _lot_ of experience with ASGs and various storage
> > types but I'm a kafka noob (about 4-5 months of using in development and
> > staging and pre-launch production).
> >
> > It seems to be working fine from the Kafka POV but causing troubling side
> > effects elsewhere that I don't understand.  I don't know enough about
> Kafka
> > to know if my implementation is just fundamentally flawed for some
> reason,
> > or if so how and why.
> >
> > My process is basically this:
> >
> > - *Terminate a node*, or increment the size of the ASG by one.  (I'm not
> > doing any graceful shutdowns because I don't want to rely on graceful
> > shutdowns, and I'm not attempting to act upon more than one node at a
> > time.  Planning on doing a ZK lock or something later to enforce one
> > process at a time, if I can work the major kinks out.)
> >
> > - *Firstboot script,* which runs on all hosts from rc.init.  (We run ASGs
> > for *everything.)  It infers things like the chef role, environment,
> > cluster name, etc, registers DNS, bootstraps and runs chef-client, etc.
> > For storage nodes, it formats and mounts a PIOPS volume under the right
> > mount point, or just remounts the volume if it already contains data.
> Etc.
> >
> > - *Run a balancing script from firstboot* on kafka nodes.  It checks to
> > see how many brokers there are and what their ids are, and checks for any
> > underbalanced partitions with less than 3 ISRs.  Then we generate a new
> > assignment file for rebalancing partitions, and execute it.  We watch on
> > the host for all the partitions to finish rebalancing, then complete.
> >
> > *- So far so good*.  I have repeatedly killed kafka nodes and had them
> > come up, rebalance the cluster, and everything on the kafka side looks
> > healthy.  All the partitions have the correct number of ISRs, etc.
> >
> > But after doing this, we have repeatedly gotten into a state where
> > consumers that are pulling off the kafka partitions enter a weird state
> > where their last known offset is *ahead* of the last known offset for
> that
> > partition, and we can't recover from it.
> >
> > *A example.*  Last night I terminated ... I think it was broker 1002 or
> > 1005, and it came back up as broker 1009.  It rebalanced on boot,
> > everything looked good from the kafka side.  This morning we noticed that
> > the storage node that maps to partition 5 has been broken for like 22
> > hours, it thinks the next offset is too far ahead / out of bounds so
> > stopped consuming.  This happened shortly after broker 1009 came online
> and
> > the consumer caught up.
> >
> > From the storage node log:
> >
> > time="2016-06-28T21:51:48.286035635Z" level=info msg="Serving at
> > 0.0.0.0:8089..."
> > time="2016-06-28T21:51:48.293946529Z" level=error msg="Error creating
> > consumer" error="kafka server: The requested offset is outside the range
> of
> > offsets maintained by the server for the given topic/partition."
> > time="2016-06-28T21:51:48.294532365Z" level=error msg="Failed to start
> > services: kafka server: The requested offset is outside the range of
> > offsets maintained by the server for the given topic/partition."
> > time="2016-06-28T21:51:48.29461156Z" level=info msg="Shutting down..."
> >
> > From the mysql mapping of partitions to storage nodes/statuses:
> >
> > PRODUCTION ubuntu@retriever-112c6d8d:/srv/hound/retriever/log$
> > hound-kennel
> >
> > Listing by default. Use -action  > setstate, addslot, removeslot, removenode> for other actions
> >
> > PartStatus  Last UpdatedHostname
> > 0   live2016-06-28 22:29:10 + UTC
>  retriever-772045ec
> > 1   live2016-06-28 22:29:29 + UTC
>  retriever-75e0e4f2
> > 2   live2016-06-28 22:29:25 + UTC
>  retriever-78804480
> > 3   live2016-06-28 22:30:01 + UTC
>  retriever-c0da5f85
> > 4   live2016-06-28 22:29:42 + UTC
>  retriever-122c6d8e
> > 5   2016-06-28 21:53:48 + UTC
> >
> >
> > PRODUCTION ubuntu@retriever-112c6d8d:/srv/hound/retriever/log$
> > hound-kennel -partition 5 -action nextoffset
> >
> > Next offset for partition 5: 12040353
> >
> >
> > Interestingly, the primary for partition 5 is 1004, and its follower is
> > the new node 1009.  (Partition 2 has 1009 as its leader and 1004 as its
> > follower, and seems just fine.)
> >
> > I've attached all the kafka logs for the broker 1009 node since it
> 

Re: kafka + autoscaling groups fuckery

2016-06-28 Thread Pradeep Gollakota
Just out of curiosity, if you guys are in AWS for everything, why not use
Kinesis?

On Tue, Jun 28, 2016 at 3:49 PM, Charity Majors  wrote:

> Hi there,
>
> I just finished implementing kafka + autoscaling groups in a way that made
> sense to me.  I have a _lot_ of experience with ASGs and various storage
> types but I'm a kafka noob (about 4-5 months of using in development and
> staging and pre-launch production).
>
> It seems to be working fine from the Kafka POV but causing troubling side
> effects elsewhere that I don't understand.  I don't know enough about Kafka
> to know if my implementation is just fundamentally flawed for some reason,
> or if so how and why.
>
> My process is basically this:
>
> - *Terminate a node*, or increment the size of the ASG by one.  (I'm not
> doing any graceful shutdowns because I don't want to rely on graceful
> shutdowns, and I'm not attempting to act upon more than one node at a
> time.  Planning on doing a ZK lock or something later to enforce one
> process at a time, if I can work the major kinks out.)
>
> - *Firstboot script,* which runs on all hosts from rc.init.  (We run ASGs
> for *everything.)  It infers things like the chef role, environment,
> cluster name, etc, registers DNS, bootstraps and runs chef-client, etc.
> For storage nodes, it formats and mounts a PIOPS volume under the right
> mount point, or just remounts the volume if it already contains data.  Etc.
>
> - *Run a balancing script from firstboot* on kafka nodes.  It checks to
> see how many brokers there are and what their ids are, and checks for any
> underbalanced partitions with less than 3 ISRs.  Then we generate a new
> assignment file for rebalancing partitions, and execute it.  We watch on
> the host for all the partitions to finish rebalancing, then complete.
>
> *- So far so good*.  I have repeatedly killed kafka nodes and had them
> come up, rebalance the cluster, and everything on the kafka side looks
> healthy.  All the partitions have the correct number of ISRs, etc.
>
> But after doing this, we have repeatedly gotten into a state where
> consumers that are pulling off the kafka partitions enter a weird state
> where their last known offset is *ahead* of the last known offset for that
> partition, and we can't recover from it.
>
> *A example.*  Last night I terminated ... I think it was broker 1002 or
> 1005, and it came back up as broker 1009.  It rebalanced on boot,
> everything looked good from the kafka side.  This morning we noticed that
> the storage node that maps to partition 5 has been broken for like 22
> hours, it thinks the next offset is too far ahead / out of bounds so
> stopped consuming.  This happened shortly after broker 1009 came online and
> the consumer caught up.
>
> From the storage node log:
>
> time="2016-06-28T21:51:48.286035635Z" level=info msg="Serving at
> 0.0.0.0:8089..."
> time="2016-06-28T21:51:48.293946529Z" level=error msg="Error creating
> consumer" error="kafka server: The requested offset is outside the range of
> offsets maintained by the server for the given topic/partition."
> time="2016-06-28T21:51:48.294532365Z" level=error msg="Failed to start
> services: kafka server: The requested offset is outside the range of
> offsets maintained by the server for the given topic/partition."
> time="2016-06-28T21:51:48.29461156Z" level=info msg="Shutting down..."
>
> From the mysql mapping of partitions to storage nodes/statuses:
>
> PRODUCTION ubuntu@retriever-112c6d8d:/srv/hound/retriever/log$
> hound-kennel
>
> Listing by default. Use -action  setstate, addslot, removeslot, removenode> for other actions
>
> PartStatus  Last UpdatedHostname
> 0   live2016-06-28 22:29:10 + UTC   retriever-772045ec
> 1   live2016-06-28 22:29:29 + UTC   retriever-75e0e4f2
> 2   live2016-06-28 22:29:25 + UTC   retriever-78804480
> 3   live2016-06-28 22:30:01 + UTC   retriever-c0da5f85
> 4   live2016-06-28 22:29:42 + UTC   retriever-122c6d8e
> 5   2016-06-28 21:53:48 + UTC
>
>
> PRODUCTION ubuntu@retriever-112c6d8d:/srv/hound/retriever/log$
> hound-kennel -partition 5 -action nextoffset
>
> Next offset for partition 5: 12040353
>
>
> Interestingly, the primary for partition 5 is 1004, and its follower is
> the new node 1009.  (Partition 2 has 1009 as its leader and 1004 as its
> follower, and seems just fine.)
>
> I've attached all the kafka logs for the broker 1009 node since it
> launched yesterday.
>
> I guess my main question is: *Is there something I am fundamentally
> missing about the kafka model that makes it it not play well with
> autoscaling?*  I see a couple of other people on the internet talking
> about using ASGs with kafka, but always in the context of maintaining a
> list of broker ids and reusing them.
>
> *I don't want to do that.  I want the path for hardware termination,
> expanding the ASG size, and

Question about bootstrap processing in KafkaStreams.

2016-06-28 Thread Rohit Valsakumar
Hi all,

Is there a way to consume all the contents of a kafka topic into a KTable 
before doing a left join with another Kstream?

I am looking at something that simulates a bootstrap topic in a Samza job.

Thanks,
Rohit Valsakumar



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Mirror maker setup - multi node

2016-06-28 Thread Gerard Klijs
Then it's much simpler, just use replication-factor=3 for your topics,
either by setting it when creating topics manually, or by
setting default.replication.factor=3 in the broker settings. Kafka will
then take care of the leadership on the partitions so both the load is
balanced, and in case some broker fails, the clients will connect to other
brokers.
The mirror maker tool is only for copying data between clusters.

On Tue, Jun 28, 2016 at 2:50 PM cs user  wrote:

> Hi there,
>
> I mean 1 cluster with 3 nodes. So I will need to run the mirror maker
> cluster on each of the 3 nodes in the cluster, in case of the loss of a
> node, the other 2 will continue to pull messages off the consumer cluster.
> It does seem to work correctly when I tested it. It just warns about topics
> with only one partition, when multiple clients are trying to consume from
> it:
>
> "No broker partitions consumed by consumer thread"
>
> One problem I have found is that when a topic is created, at first the
> mirror is unable to consume messages instantly. It seems that only say 70%
> of the messages( say 7,000 of of 10,000) that were sent to newly created
> topic make it to the mirror.
>
> After the first batch, however, the messages seem to be mirrored correctly.
>
> I checked the mirror maker process logs and found the following, just as
> the topic was created:
>
> [2016-06-28 11:56:46,339] WARN Error while fetching metadata with
> correlation id 0 : {topictest4=LEADER_NOT_AVAILABLE}
> (org.apache.kafka.clients.NetworkClient)
> [2016-06-28 11:56:46,545] WARN Error while fetching metadata with
> correlation id 1 : {topictest4=LEADER_NOT_AVAILABLE}
> (org.apache.kafka.clients.NetworkClient)
> [2016-06-28 11:56:46,649] WARN Error while fetching metadata with
> correlation id 2 : {topictest4=LEADER_NOT_AVAILABLE}
> (org.apache.kafka.clients.NetworkClient)
>
> Is there any reason not all of the messages made it through? Is there a way
> to reset the offset so it tries to sync the messages from the beginning?
>
> Thanks!
>
> On Tue, Jun 28, 2016 at 12:00 PM, Gerard Klijs 
> wrote:
>
> > With 3 nodes, I assume you mean 3 clusters? If I understand correctly,
> say
> > you have 3 clusters, A, B, and C, you simultaneously:
> > - want to copy from A and B to C, to get an aggregation in C
> > - want to copy fram A and C to B, to get a fail-back aggregation in B.
> > Now what will happen when a message is produced in cluster a?
> > - it will be copied to both C and B.
> > - the copy wil cause a new copy in C and B,
> > etc.
> > There are several ways out if this, depending on your use case. It's
> pretty
> > easy to change the behaviour of the mirrormaker, for example to copy it
> to
> > $topic-aggregation instead of $topic, and to not copy it when the topic
> > ends with aggregation
> >
> > On Tue, Jun 28, 2016 at 10:15 AM cs user  wrote:
> >
> > > Hi All,
> > >
> > > So I understand I can run the following to aggregate topics from two
> > > different clusters into a mirror:
> > >
> > > bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
> > > sourceCluster1Consumer.config --consumer.config
> > > sourceCluster2Consumer.config --num.streams 2 --producer.config
> > > targetClusterProducer.config --whitelist=".*"
> > >
> > > Lets say my kafka mirror cluster consists of 3 nodes, can the above
> > process
> > > be started on each of the 3 nodes, so that in the event it fails on one
> > the
> > > other 2 will keep going?
> > >
> > > Or should only one of the nodes attempt to perform the aggregation?
> > >
> > > Thanks!
> > >
> >
>


Re: Producer Properties

2016-06-28 Thread Chris Barlock
Thank you, Dan!

Chris





From:   "Dan Bahir (BLOOMBERG/ 120 PARK)" 
To: users@kafka.apache.org
Date:   06/28/2016 09:49 AM
Subject:Re: Producer Properties



Hi Chris, 

The new producer returns a future so it works in an async manner hence no 
need for the producer.type property.

0.8 -> 0.10
batch.num.messages -> batch.size
queue.buffering.max.ms -> linger.ms

Your assumptions for the serializers look correct.

Take a look at the documentation for both versions.
http://kafka.apache.org/081/documentation.html and 
http://kafka.apache.org/081/documentation.html.

Hope that helps


From: users@kafka.apache.org At: 06/27/16 09:34:55
To: users@kafka.apache.org
Subject: Re: Producer Properties

Anybody?

Chris


From:   Chris Barlock/Raleigh/IBM@IBMUS
To: users@kafka.apache.org
Date:   06/24/2016 04:56 PM
Subject:Producer Properties


I started porting our code from Kafka 0.8.2.1 to 0.10.0.0 and found my 
producer code blowing up because of some changes to the config.  For 
example,  metadata.broker.list is now bootstrap.servers.  I discovered the 


ProducerConfig class which has, at least, some of the config keys.  Before 


I screw this up, I'd like some confirmation of the right mappings for 
these config pararmeters in our 0.8.2.1 code:

serializer.classMaybe value.serializer = 
VALUE_SERIALIZER_CLASS_CONFIG?
key.serializer.classMaybe key.serializer = 
KEY_SERIALIZER_CLASS_CONFIG?
producer.type   Not in ProducerConfig
batch.num.messages  Not in ProducerConfig, unless maybe 
batch.size = BATCH_SIZE_CONFIG?
queue.buffering.max.ms  Not in ProducerConfig

Thanks!

Chris








Re: Producer Properties

2016-06-28 Thread Dan Bahir (BLOOMBERG/ 120 PARK)
Hi Chris, 

The new producer returns a future so it works in an async manner hence no need 
for the producer.type property.

0.8 -> 0.10
batch.num.messages -> batch.size
queue.buffering.max.ms -> linger.ms

Your assumptions for the serializers look correct.

Take a look at the documentation for both versions.
http://kafka.apache.org/081/documentation.html and 
http://kafka.apache.org/081/documentation.html.

Hope that helps


From: users@kafka.apache.org At: 06/27/16 09:34:55
To: users@kafka.apache.org
Subject: Re: Producer Properties

Anybody?

Chris


From:   Chris Barlock/Raleigh/IBM@IBMUS
To: users@kafka.apache.org
Date:   06/24/2016 04:56 PM
Subject:Producer Properties


I started porting our code from Kafka 0.8.2.1 to 0.10.0.0 and found my 
producer code blowing up because of some changes to the config.  For 
example,  metadata.broker.list is now bootstrap.servers.  I discovered the 

ProducerConfig class which has, at least, some of the config keys.  Before 

I screw this up, I'd like some confirmation of the right mappings for 
these config pararmeters in our 0.8.2.1 code:

serializer.classMaybe value.serializer = 
VALUE_SERIALIZER_CLASS_CONFIG?
key.serializer.classMaybe key.serializer = 
KEY_SERIALIZER_CLASS_CONFIG?
producer.type   Not in ProducerConfig
batch.num.messages  Not in ProducerConfig, unless maybe 
batch.size = BATCH_SIZE_CONFIG?
queue.buffering.max.ms  Not in ProducerConfig

Thanks!

Chris




Re: Mirror maker setup - multi node

2016-06-28 Thread cs user
Hi there,

I mean 1 cluster with 3 nodes. So I will need to run the mirror maker
cluster on each of the 3 nodes in the cluster, in case of the loss of a
node, the other 2 will continue to pull messages off the consumer cluster.
It does seem to work correctly when I tested it. It just warns about topics
with only one partition, when multiple clients are trying to consume from
it:

"No broker partitions consumed by consumer thread"

One problem I have found is that when a topic is created, at first the
mirror is unable to consume messages instantly. It seems that only say 70%
of the messages( say 7,000 of of 10,000) that were sent to newly created
topic make it to the mirror.

After the first batch, however, the messages seem to be mirrored correctly.

I checked the mirror maker process logs and found the following, just as
the topic was created:

[2016-06-28 11:56:46,339] WARN Error while fetching metadata with
correlation id 0 : {topictest4=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
[2016-06-28 11:56:46,545] WARN Error while fetching metadata with
correlation id 1 : {topictest4=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
[2016-06-28 11:56:46,649] WARN Error while fetching metadata with
correlation id 2 : {topictest4=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)

Is there any reason not all of the messages made it through? Is there a way
to reset the offset so it tries to sync the messages from the beginning?

Thanks!

On Tue, Jun 28, 2016 at 12:00 PM, Gerard Klijs 
wrote:

> With 3 nodes, I assume you mean 3 clusters? If I understand correctly, say
> you have 3 clusters, A, B, and C, you simultaneously:
> - want to copy from A and B to C, to get an aggregation in C
> - want to copy fram A and C to B, to get a fail-back aggregation in B.
> Now what will happen when a message is produced in cluster a?
> - it will be copied to both C and B.
> - the copy wil cause a new copy in C and B,
> etc.
> There are several ways out if this, depending on your use case. It's pretty
> easy to change the behaviour of the mirrormaker, for example to copy it to
> $topic-aggregation instead of $topic, and to not copy it when the topic
> ends with aggregation
>
> On Tue, Jun 28, 2016 at 10:15 AM cs user  wrote:
>
> > Hi All,
> >
> > So I understand I can run the following to aggregate topics from two
> > different clusters into a mirror:
> >
> > bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
> > sourceCluster1Consumer.config --consumer.config
> > sourceCluster2Consumer.config --num.streams 2 --producer.config
> > targetClusterProducer.config --whitelist=".*"
> >
> > Lets say my kafka mirror cluster consists of 3 nodes, can the above
> process
> > be started on each of the 3 nodes, so that in the event it fails on one
> the
> > other 2 will keep going?
> >
> > Or should only one of the nodes attempt to perform the aggregation?
> >
> > Thanks!
> >
>


Re: Kafka Streams reducebykey and tumbling window - need final windowed KTable values

2016-06-28 Thread Eno Thereska
Hi Clive,

As promised, here is the link to the KIP that just went out today. Feedback 
welcome:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
 


Thanks
Eno

> On 27 Jun 2016, at 20:56, Eno Thereska  wrote:
> 
> Hi Clive,
> 
> We are working on exposing the state store behind a KTable as part of 
> allowing for queries to the structures currently hidden behind the language 
> (DSL). The KIP should be out today or tomorrow for you to have a look. You 
> can probably do what you need using the low-level processor API but then 
> you'd lose the benefits of the DSL and would have to maintain your own 
> structures.
> 
> Thanks,
> Eno
> 
>> On 26 Jun 2016, at 18:42, Clive Cox  wrote:
>> 
>> Following on from this thread, if I want to iterate over a KTable at the end 
>> of its hopping/tumbling Time Window how can I do this at present myself? Is 
>> there a way to access these structures?
>> If this is not possible it would seem I need to duplicate and manage 
>> something similar to a list of windowed KTables myself which is not really 
>> ideal.
>> Thanks for any help,
>> Clive
>> 
>> 
>>   On Monday, 13 June 2016, 16:03, Eno Thereska  
>> wrote:
>> 
>> 
>> Hi Clive,
>> 
>> For now this optimisation is not present. We're working on it as part of 
>> KIP-63. One manual work-around might be to use a simple Key-value store to 
>> deduplicate the final output before sending to the backend. It could have a 
>> simple policy like "output all values at 1 second intervals" or "output 
>> after 10 records have been received".
>> 
>> Eno
>> 
>> 
>>> On 13 Jun 2016, at 13:36, Clive Cox  wrote:
>>> 
>>> 
>>> Thanks Eno for your comments and references.
>>> Perhaps, I can explain what I want to achieve and maybe you can suggest the 
>>> correct topology?
>>> I want process a stream of events and do aggregation and send to an 
>>> analytics backend (Influxdb), so that rather than sending 1000 points/sec 
>>> to the analytics backend, I send a much lower value. I'm only interested in 
>>> using the processing time of the event so in that respect there are no 
>>> "late arriving" events.I was hoping I could use a Tumbling window which 
>>> when its end-time had been passed I can send the consolidated aggregation 
>>> for that window and then throw the Window away. 
>>> 
>>> It sounds like from the references you give that this is not possible at 
>>> present in Kafka Streams?
>>> 
>>> Thanks,
>>> Clive 
>>> 
>>>On Monday, 13 June 2016, 11:32, Eno Thereska  
>>> wrote:
>>> 
>>> 
>>> Hi Clive,
>>> 
>>> The behaviour you are seeing is indeed correct (though not necessarily 
>>> optimal in terms of performance as described in this JIRA: 
>>> https://issues.apache.org/jira/browse/KAFKA-3101 
>>> )
>>> 
>>> The key observation is that windows never close/complete. There could 
>>> always be late arriving events that appear long after a window's end 
>>> interval and those need to be accounted for properly. In Kafka Streams that 
>>> means that such late arriving events continue to update the value of the 
>>> window. As described in the above JIRA, some optimisations could still be 
>>> possible (e.g., batch requests as described in KIP-63 
>>> ),
>>>  however they are not implemented yet.
>>> 
>>> So your code needs to handle each update.
>>> 
>>> Thanks
>>> Eno
>>> 
>>> 
>>> 
 On 13 Jun 2016, at 11:13, Clive Cox  wrote:
 
 Hi,
  I would like to process a stream with a tumbling window of 5secs, create 
 aggregated stats for keys and push the final aggregates at the end of each 
 window period to a analytics backend. I have tried doing something like:
  stream
.map
.reduceByKey(...
  , TimeWindows.of("mywindow", 5000L),...)
.foreach{send stats
  }
 But I get every update to the ktable in the foreach.
 How do I just get the final values once the TumblingWindow is complete so 
 I can iterate over them and send to some external system?
 Thanks,
  Clive
 PS Using kafka_2.10-0.10.0.0
 
>>> 
>>> 
>> 
>> 
> 



Re: Mirror maker setup - multi node

2016-06-28 Thread Gerard Klijs
With 3 nodes, I assume you mean 3 clusters? If I understand correctly, say
you have 3 clusters, A, B, and C, you simultaneously:
- want to copy from A and B to C, to get an aggregation in C
- want to copy fram A and C to B, to get a fail-back aggregation in B.
Now what will happen when a message is produced in cluster a?
- it will be copied to both C and B.
- the copy wil cause a new copy in C and B,
etc.
There are several ways out if this, depending on your use case. It's pretty
easy to change the behaviour of the mirrormaker, for example to copy it to
$topic-aggregation instead of $topic, and to not copy it when the topic
ends with aggregation

On Tue, Jun 28, 2016 at 10:15 AM cs user  wrote:

> Hi All,
>
> So I understand I can run the following to aggregate topics from two
> different clusters into a mirror:
>
> bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
> sourceCluster1Consumer.config --consumer.config
> sourceCluster2Consumer.config --num.streams 2 --producer.config
> targetClusterProducer.config --whitelist=".*"
>
> Lets say my kafka mirror cluster consists of 3 nodes, can the above process
> be started on each of the 3 nodes, so that in the event it fails on one the
> other 2 will keep going?
>
> Or should only one of the nodes attempt to perform the aggregation?
>
> Thanks!
>


Mirror maker setup - multi node

2016-06-28 Thread cs user
Hi All,

So I understand I can run the following to aggregate topics from two
different clusters into a mirror:

bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
sourceCluster1Consumer.config --consumer.config
sourceCluster2Consumer.config --num.streams 2 --producer.config
targetClusterProducer.config --whitelist=".*"

Lets say my kafka mirror cluster consists of 3 nodes, can the above process
be started on each of the 3 nodes, so that in the event it fails on one the
other 2 will keep going?

Or should only one of the nodes attempt to perform the aggregation?

Thanks!