Consumption on a explicitly (dynamically) created topic has a 5 minute delay

2017-02-21 Thread Jaikiran Pai
We are on Kafka 0.10.0.1 (server and client) and use Java 
consumer/producer APIs. We have an application where we create Kafka 
topics dynamically (using the AdminUtils Java API) and then start 
producing/consuming on those topics. The issue we frequently run into is 
this:


1. Application process creates a topic "foo-bar" via 
AdminUtils.createTopic. This is sucessfully completed.
2. Same application process then creates a consumer (using new Java 
consumer API) on that foo-bar topic as a next step.
3. The consumer that gets created in step#2 however, doesn't seem to be 
enrolled in consumer group for this topic because of this (notice the 
last line in the log):


2017-02-21 00:58:43,359 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
2017-02-21 00:58:43,360 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to 
topic(s): foo-bar
2017-02-21 00:58:43,543 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Received group coordinator response 
ClientResponse(receivedTimeMs=1487667523542, disconnected=false, 
request=ClientRequest(expectResponse=true, 
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f, 
request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, 
body={group_id=my-app-group}), createdTimeMs=1487667523378, 
sendTimeMs=1487667523529), 
responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}})
2017-02-21 00:58:43,543 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for 
group my-app-group.
2017-02-21 00:58:43,545 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
Revoking previously assigned partitions [] for group my-app-group
2017-02-21 00:58:43,545 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
(Re-)joining group my-app-group
2017-02-21 00:58:43,548 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Sending JoinGroup 
({group_id=my-app-group,session_timeout=3,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 
lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647 rack: null)
2017-02-21 00:58:43,548 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.bytes-sent
2017-02-21 00:58:43,549 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.bytes-received
2017-02-21 00:58:43,549 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.latency
2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Received successful join group response for group my-app-group: 
{error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0 
lim=59 cap=59]}]}
2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
Performing assignment for group my-app-group using strategy range with 
subscriptions 
{consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscription(topics=[foo-bar])}
*2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor - 
Skipping assignment for topic foo-bar since no metadata is available*



4. A few seconds later, a separate process, produces (via Java producer 
API) on the foo-bar topic, some messages.
5. The consumer created in step#2 (although is waiting for messages) on 
the foo-bar topic, _doesn't_ consume these messages.
6. *5 minutes later* the Kafka server triggers a consumer rebalance 
which then successfully assigns partition(s) of this foo-bar topic to 
consumer created in step#2 and the consumer start consuming these messages.


This 5 minute delay in consuming messages from this dynamically created 
topic is what we want to avoid. Is there anyway I can deterministically 
do/force creation of a dynamic topic and be assured that upon completion 
of that call, I can create a consumer and start consuming of that topic 
such that it can receive messages as soon as the messages are produced 
on that topic, without having to wait for a 5 minute delay (or whatever 
the rebalance configuration is)? In essence, is there a way to ensure 
that the Kafka consumer does get the topic metadata for a topic that was 
created successfully by the same application, immediately?



P.S: We have topic auto creation disabled, so this isn't really a auto 
topic creation issue. In our case we are 

How to stop Kafka Mirror Maker

2017-02-21 Thread Qian Zhu
Hi,
   For now, I am doing “kill –9 processID” to stop the Kafka Mirror Maker. 
I am wondering whether there is a better way (e.g. a command) to do so? I don’t 
expect to stop the mirror maker frequently but I would like to have a script to 
automate the start and stop.

Thanks a lot!
Qian Zhu


Re: JMX metrics for replica lag time

2017-02-21 Thread Jun MA
Hi Guozhang,

Thanks for pointing this out. I was actually looking at this before and that’s 
why I’m asking the question. This metric is 'lag in messages', and since now 
the ISR logic relies on lag in seconds, not lag in messages, I’m not sure how 
useful this metrics is. In fact, we saw the value of this metrics been 0 all 
the time, even when there's ISR shrink/expand. I’d expect to see a increasing 
in lag when shrink/expand happens. Is there a metrics that can correctly 
represent the lag between followers and the leader?

Thanks,
Jun

> On Feb 21, 2017, at 10:19 AM, Guozhang Wang  wrote:
> 
> You can find them in https://kafka.apache.org/documentation/#monitoring
> 
> I think this is the one you are looking for:
> 
> Lag in messages per follower replica
> kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)
> lag
> should be proportional to the maximum batch size of a produce request.
> 
> On Mon, Feb 20, 2017 at 5:43 PM, Jun Ma  wrote:
> 
>> Hi Guozhang,
>> 
>> Thanks for your replay. Could you tell me which one indicates the lag
>> between follower and leader for a specific partition?
>> 
>> Thanks,
>> Jun
>> 
>> On Mon, Feb 20, 2017 at 4:57 PM, Guozhang Wang  wrote:
>> 
>>> I don't think the metrics have been changed in 0.9.0.1, in fact even in
>>> 0.10.x they are still the same as stated in:
>>> 
>>> https://kafka.apache.org/documentation/#monitoring
>>> 
>>> The mechanism for determine which followers have been dropped out of ISR
>>> has changed, but the metrics are not.
>>> 
>>> 
>>> Guozhang
>>> 
>>> 
>>> On Sun, Feb 19, 2017 at 7:56 PM, Jun MA  wrote:
>>> 
 Hi,
 
 I’m looking for the JMX metrics to represent replica lag time for
>>> 0.9.0.1.
 Base on the documentation, I can only find kafka.server:type=
 ReplicaFetcherManager,name=MaxLag,clientId=Replica, which is max lag
>> in
 messages btw follower and leader replicas. But since in 0.9.0.1 lag in
 messages is deprecated and replaced with lag time, I’m wondering what
>> is
 the corresponding metrics for this?
 
 Thanks,
 Jun
>>> 
>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
>>> 
>> 
> 
> 
> 
> -- 
> -- Guozhang



Kafka with SSL (question about certificate management)

2017-02-21 Thread Raghav
Hi

We have a Kafka Client (Producer) which periodically generates some stuff
and pushes to a Kafka Broker, that is out of our network. We want to use
secure Kafka using SSL.

I read the http://kafka.apache.org/documentation/#security. in which it
explains about the Kafka with SSL.

I have a question: so on my Producer VM I have generated public/private key
pair and a certificate. There is an independent CA (certificate authority)
which we don't know as of now. I am wondering if there are any free
*certificate
signing services* online where I can make a REST based call by providing
our unsigned certificate, and the CA will return a signed certificated,
along with its certificate which we can add to our keystone and trust store
?

I am sort of a newbie, so I am interested in how CA's sign the certificates
?

Any help is greatly appreciated.

Thanks.

-- 
Raghav


Re: KTable send old values API

2017-02-21 Thread Dmitry Minkovsky
Hi Eno,

Thank you. I don't think I'm advanced enough to imagine a good API. But I
can elaborate my use-cases further.

So say I have two tables:

KTable left = topology.table(stringSerde,
stringSerde, topicLeft, topicLeft);
KTable right = topology.table(stringSerde,
stringSerde, topicRight, topicRight);

left
  .leftJoin(right, (l, r) -> asList(l, r))
  .to(topicView)


   - I'd like to filter duplicates out of the change stream. I only want
   topicView to receive proper updates.

   - I'd like to be able to detect change type easy:

   - oldValue == null and newValue != null => create
  - oldValue != null and newValue == null => delete
  - oldValue != null and newValue != null => update

  - I'd like to be able to update indices when records are deleted or
   updated. Old values are needed to determine which index keys which should
   be updated or removed.


I can do all these things now, mostly with groupBy()/reduce(),
groupBy/aggregate() and transform().


Best,
Dmitry

On Tue, Feb 21, 2017 at 5:21 PM, Eno Thereska 
wrote:

> Hi Dmitry,
>
> Could you tell us more on the exact API you'd like? Perhaps if others find
> it useful too we/you can do a KIP.
>
> Thanks
> Eno
>
> > On 21 Feb 2017, at 22:01, Dmitry Minkovsky  wrote:
> >
> > At KAFKA-2984: ktable sends old values when required
> > , @ymatsuda
> > writes:
> >
> >> NOTE: This is meant to be used by aggregation. But, if there is a use
> > case like a SQL database trigger, we can add a new KTable method to
> expose
> > this.
> >
> > Looking through the source it does not seem that this API was ever
> exposed.
> > Not finding anything on Google on this subject either. The SQL database
> > trigger is my exact use case. Enabling change-streaming for some tables
> > would help simplify my code. Is this possible? Is this scheduled for a
> > future version?
> >
> > Thank you,
> > Dmitry
>
>


Re: KTable send old values API

2017-02-21 Thread Eno Thereska
Hi Dmitry,

Could you tell us more on the exact API you'd like? Perhaps if others find it 
useful too we/you can do a KIP.

Thanks
Eno

> On 21 Feb 2017, at 22:01, Dmitry Minkovsky  wrote:
> 
> At KAFKA-2984: ktable sends old values when required
> , @ymatsuda
> writes:
> 
>> NOTE: This is meant to be used by aggregation. But, if there is a use
> case like a SQL database trigger, we can add a new KTable method to expose
> this.
> 
> Looking through the source it does not seem that this API was ever exposed.
> Not finding anything on Google on this subject either. The SQL database
> trigger is my exact use case. Enabling change-streaming for some tables
> would help simplify my code. Is this possible? Is this scheduled for a
> future version?
> 
> Thank you,
> Dmitry



Re: Heartbeats while consuming a message in kafka-python

2017-02-21 Thread Jeff Widman
Yes, in the linked issues in my email it shows both projects plan to add
support for it.

The problem with a high session.timeout.ms is if you've got a message that
is locking your consumer in a perpetual processing cycle, then the consumer
will timeout of the group w/o rejoining. So then another consumer in your
group can get locked. Pretty quickly your whole consumer group can be down
due to that single message.

If instead you've got a decoupled heartbeat, then your stuck consumer won't
drop out of the group, so no rebalance will be triggered, and the other
consumers will happily continue consuming their other partitions.

While neither is a great solution, I'd rather have the latter as at least
some of your partitions are still working fine rather than bringing your
entire pipeline to a complete halt.





On Tue, Feb 21, 2017 at 1:50 PM, Guozhang Wang  wrote:

> Hello Jeff,
>
> You are right that currently kafka-python does not expose two configs, i.e.
> session.timeout.ms and max.poll.timeout.ms as the Java client does, but I
> think the current setting may be sufficient to tackle Martin's issue alone
> as long as session.timeout.ms can be tuned to be large enough; like you
> said the two configs are for separating the cases between "hard failures
> detected by heartbeat" and "soft failures like long GC / blocked on IO /
> simply processing a record takes long time".
>
> BTW I believe fully supporting KIP-62 is on the roadmap of kafka-python
> already, and maybe Magnus (cc'ed) can explain a bit more on the timeline of
> it.
>
>
> Guozhang
>
>
> On Tue, Feb 21, 2017 at 12:06 PM, Jeff Widman  wrote:
>
> > As far as I understood it, the primary thrust of KIP-62 was making it so
> > heartbeats could be issued outside of the poll() loop, meaning that the
> > session.timeout.ms could be reduced below the length of time it takes a
> > consumer to process a particular batch of messages.
> >
> > Unfortunately, while both librdkafka (which confluent-kafka-python relies
> > on under the covers) and kafka-python support issuing heartbeats from a
> > background thread, they both currently still tie that heartbeat to the
> > poll() call. So the developer still has to manually tune max.poll.records
> > and session.timeout.ms, versus once this background heartbeating is
> > decoupling from polling, the defaults should be fine for more use-cases.
> >
> > I actually filed tickets against both projects a few weeks ago for this:
> > https://github.com/edenhill/librdkafka/issues/1039
> > https://github.com/dpkp/kafka-python/issues/948
> >
> >
> >
> > On Tue, Feb 21, 2017 at 11:01 AM, Guozhang Wang 
> > wrote:
> >
> > > Hi Martin,
> > >
> > > Since 0.10.1 KIP-62 has been added to consumer client, so that the user
> > > does not need to manually call pause / resume.
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> > >
> > > As for python client, as far as I know this background thread approach
> > has
> > > also been adopted in the Confluent's open source kafka-python client as
> > > well:
> > >
> > > https://github.com/confluentinc/confluent-kafka-python
> > >
> > > So that as long as you are willing to set a `session.timeout.ms` high
> > > enough to cover the maximum processing latency of a single record, the
> > > background thread will be responsible for sending heartbeats and hence
> > > users do not need to worry about them.
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Feb 21, 2017 at 7:24 AM, Martin Sucha 
> > wrote:
> > >
> > > > Hello,
> > > >
> > > > I'm processing messages using kafka-python from a single topic, but
> > > > ocassionally I have a message that might take a while to process, so
> > I'd
> > > > like to send heartbeats while processing the message.
> > > >
> > > > My program works something like this:
> > > >
> > > > consumer = KafkaConsumer(...)
> > > > for message in consumer:
> > > > if should_store(message.value):
> > > > store_state(message.value)
> > > > elif should_process(message.value):
> > > > # This loop might take a while (longer than heartbeat
> interval)
> > > > for value in stored_state(message.value):
> > > > do_something(value)
> > > >
> > > > I think I need to call consumer.client.poll() at regular intervals to
> > > send
> > > > the heartbeats, so the code would look like this:
> > > >
> > > > consumer = KafkaConsumer(...)
> > > > for message in consumer:
> > > > if should_store(message.value):
> > > > store_state(message.value)
> > > > elif should_process(message.value):
> > > > # This loop might take a while (longer that heartbeat
> interval)
> > > > for index, value in enumerate(stored_state(message.value)):
> > > > do_something(value)
> > > > if index % 1 == 0:
> > > > 

KTable send old values API

2017-02-21 Thread Dmitry Minkovsky
At KAFKA-2984: ktable sends old values when required
, @ymatsuda
writes:

> NOTE: This is meant to be used by aggregation. But, if there is a use
case like a SQL database trigger, we can add a new KTable method to expose
this.

Looking through the source it does not seem that this API was ever exposed.
Not finding anything on Google on this subject either. The SQL database
trigger is my exact use case. Enabling change-streaming for some tables
would help simplify my code. Is this possible? Is this scheduled for a
future version?

Thank you,
Dmitry


Re: Heartbeats while consuming a message in kafka-python

2017-02-21 Thread Guozhang Wang
Hello Jeff,

You are right that currently kafka-python does not expose two configs, i.e.
session.timeout.ms and max.poll.timeout.ms as the Java client does, but I
think the current setting may be sufficient to tackle Martin's issue alone
as long as session.timeout.ms can be tuned to be large enough; like you
said the two configs are for separating the cases between "hard failures
detected by heartbeat" and "soft failures like long GC / blocked on IO /
simply processing a record takes long time".

BTW I believe fully supporting KIP-62 is on the roadmap of kafka-python
already, and maybe Magnus (cc'ed) can explain a bit more on the timeline of
it.


Guozhang


On Tue, Feb 21, 2017 at 12:06 PM, Jeff Widman  wrote:

> As far as I understood it, the primary thrust of KIP-62 was making it so
> heartbeats could be issued outside of the poll() loop, meaning that the
> session.timeout.ms could be reduced below the length of time it takes a
> consumer to process a particular batch of messages.
>
> Unfortunately, while both librdkafka (which confluent-kafka-python relies
> on under the covers) and kafka-python support issuing heartbeats from a
> background thread, they both currently still tie that heartbeat to the
> poll() call. So the developer still has to manually tune max.poll.records
> and session.timeout.ms, versus once this background heartbeating is
> decoupling from polling, the defaults should be fine for more use-cases.
>
> I actually filed tickets against both projects a few weeks ago for this:
> https://github.com/edenhill/librdkafka/issues/1039
> https://github.com/dpkp/kafka-python/issues/948
>
>
>
> On Tue, Feb 21, 2017 at 11:01 AM, Guozhang Wang 
> wrote:
>
> > Hi Martin,
> >
> > Since 0.10.1 KIP-62 has been added to consumer client, so that the user
> > does not need to manually call pause / resume.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> >
> > As for python client, as far as I know this background thread approach
> has
> > also been adopted in the Confluent's open source kafka-python client as
> > well:
> >
> > https://github.com/confluentinc/confluent-kafka-python
> >
> > So that as long as you are willing to set a `session.timeout.ms` high
> > enough to cover the maximum processing latency of a single record, the
> > background thread will be responsible for sending heartbeats and hence
> > users do not need to worry about them.
> >
> > Guozhang
> >
> >
> > On Tue, Feb 21, 2017 at 7:24 AM, Martin Sucha 
> wrote:
> >
> > > Hello,
> > >
> > > I'm processing messages using kafka-python from a single topic, but
> > > ocassionally I have a message that might take a while to process, so
> I'd
> > > like to send heartbeats while processing the message.
> > >
> > > My program works something like this:
> > >
> > > consumer = KafkaConsumer(...)
> > > for message in consumer:
> > > if should_store(message.value):
> > > store_state(message.value)
> > > elif should_process(message.value):
> > > # This loop might take a while (longer than heartbeat interval)
> > > for value in stored_state(message.value):
> > > do_something(value)
> > >
> > > I think I need to call consumer.client.poll() at regular intervals to
> > send
> > > the heartbeats, so the code would look like this:
> > >
> > > consumer = KafkaConsumer(...)
> > > for message in consumer:
> > > if should_store(message.value):
> > > store_state(message.value)
> > > elif should_process(message.value):
> > > # This loop might take a while (longer that heartbeat interval)
> > > for index, value in enumerate(stored_state(message.value)):
> > > do_something(value)
> > > if index % 1 == 0:
> > > consumer.client.poll()
> > >
> > > Is calling KafkaClient.poll() like this safe to do? The documentation
> for
> > > KafkaConsumer.poll() says it is incompatible with the iterator
> interface
> > > but nothing like that is in KafkaClient.poll() documentation.
> > >
> > > Also, there is KafkaConsumer.pause(). Do I need to pause the partitions
> > I'm
> > > fetching from before calling consumer.client.poll()? Based on what I
> have
> > > seen in the code it looks like calling pause() will discard the
> buffered
> > > messages fetched for that partition so far and then fetch them again
> when
> > > calling resume(). Is that correct? In this case I'd rather not call
> > pause()
> > > if it is not necessary.
> > >
> > > Thanks for clarification.
> > >
> > > Best Regards,
> > > Martin
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: hitting the throughput limit on a cluster?

2017-02-21 Thread Jon Yeargers
Thanks for looking at this issue. I checked the max IOPs for this disk and
we're only at about 10%. I can add more disks to spread out the work.

What IOWait values should I be aiming for?

Also - what do you set openfiles to? I have it at 65535 but I just read a
doc that suggested > 100K is better


On Tue, Feb 21, 2017 at 10:45 AM, Todd Palino  wrote:

> So I think the important thing to look at here is the IO wait on your
> system. You’re hitting disk throughput issues, and that’s what you most
> likely need to resolve. So just from what you’ve described, I think the
> only thing that is going to get you more performance is more spindles (or
> faster spindles). This is either more disks or more brokers, but at the end
> of it you need to eliminate the disk IO bottleneck.
>
> -Todd
>
>
> On Tue, Feb 21, 2017 at 7:29 AM, Jon Yeargers 
> wrote:
>
> > Running 3x 8core on google compute.
> >
> > Topic has 16 partitions (replication factor 2) and is consumed by 16
> docker
> > containers on individual hosts.
> >
> > System seems to max out at around 4 messages / minute. Each message
> is
> > ~12K - compressed (snappy) JSON.
> >
> > Recently moved from 12 to the above 16 partitions with no change in
> > throughput.
> >
> > Also tried increased the consumption capacity on each container by 50%.
> No
> > effect.
> >
> > Network is running at ~6Gb/sec (measured using iperf3). Broker load is
> > ~1.5. IOWait % is 5-10 (via sar).
> >
> > What are my options for adding throughput?
> >
> > - more brokers?
> > - avro/protobuf messaging?
> > - more disks / broker? (1 / host presently)
> > - jumbo frames?
> >
> > (transparent huge pages is disabled)
> >
> >
> > Looking at this article (
> > https://engineering.linkedin.com/kafka/benchmarking-apache-
> > kafka-2-million-writes-second-three-cheap-machines)
> > it would appear that for our message size we are at the max. This would
> > argue that we need to shrink the message size - so perhaps switching to
> > avro is the next step?
> >
>
>
>
> --
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino
>


Re: Heartbeats while consuming a message in kafka-python

2017-02-21 Thread Jeff Widman
As far as I understood it, the primary thrust of KIP-62 was making it so
heartbeats could be issued outside of the poll() loop, meaning that the
session.timeout.ms could be reduced below the length of time it takes a
consumer to process a particular batch of messages.

Unfortunately, while both librdkafka (which confluent-kafka-python relies
on under the covers) and kafka-python support issuing heartbeats from a
background thread, they both currently still tie that heartbeat to the
poll() call. So the developer still has to manually tune max.poll.records
and session.timeout.ms, versus once this background heartbeating is
decoupling from polling, the defaults should be fine for more use-cases.

I actually filed tickets against both projects a few weeks ago for this:
https://github.com/edenhill/librdkafka/issues/1039
https://github.com/dpkp/kafka-python/issues/948



On Tue, Feb 21, 2017 at 11:01 AM, Guozhang Wang  wrote:

> Hi Martin,
>
> Since 0.10.1 KIP-62 has been added to consumer client, so that the user
> does not need to manually call pause / resume.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
>
> As for python client, as far as I know this background thread approach has
> also been adopted in the Confluent's open source kafka-python client as
> well:
>
> https://github.com/confluentinc/confluent-kafka-python
>
> So that as long as you are willing to set a `session.timeout.ms` high
> enough to cover the maximum processing latency of a single record, the
> background thread will be responsible for sending heartbeats and hence
> users do not need to worry about them.
>
> Guozhang
>
>
> On Tue, Feb 21, 2017 at 7:24 AM, Martin Sucha  wrote:
>
> > Hello,
> >
> > I'm processing messages using kafka-python from a single topic, but
> > ocassionally I have a message that might take a while to process, so I'd
> > like to send heartbeats while processing the message.
> >
> > My program works something like this:
> >
> > consumer = KafkaConsumer(...)
> > for message in consumer:
> > if should_store(message.value):
> > store_state(message.value)
> > elif should_process(message.value):
> > # This loop might take a while (longer than heartbeat interval)
> > for value in stored_state(message.value):
> > do_something(value)
> >
> > I think I need to call consumer.client.poll() at regular intervals to
> send
> > the heartbeats, so the code would look like this:
> >
> > consumer = KafkaConsumer(...)
> > for message in consumer:
> > if should_store(message.value):
> > store_state(message.value)
> > elif should_process(message.value):
> > # This loop might take a while (longer that heartbeat interval)
> > for index, value in enumerate(stored_state(message.value)):
> > do_something(value)
> > if index % 1 == 0:
> > consumer.client.poll()
> >
> > Is calling KafkaClient.poll() like this safe to do? The documentation for
> > KafkaConsumer.poll() says it is incompatible with the iterator interface
> > but nothing like that is in KafkaClient.poll() documentation.
> >
> > Also, there is KafkaConsumer.pause(). Do I need to pause the partitions
> I'm
> > fetching from before calling consumer.client.poll()? Based on what I have
> > seen in the code it looks like calling pause() will discard the buffered
> > messages fetched for that partition so far and then fetch them again when
> > calling resume(). Is that correct? In this case I'd rather not call
> pause()
> > if it is not necessary.
> >
> > Thanks for clarification.
> >
> > Best Regards,
> > Martin
> >
>
>
>
> --
> -- Guozhang
>


Re: Implementing a non-key in Kafka Streams using the Processor API

2017-02-21 Thread Guozhang Wang
Thanks for sharing Jan. I think it would help if you are share a sketch of
your code snippet for illustrating the implementation.

As for the recent development, assuming you are referring to KIP-120 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API),
we are still discussing on that thread and would definitely want to hear
your feedbacks, so please feel free to send them on that email thread. But
just to clarify we are not intending to remove any useful features from
PAPI, and if you can take look at the functions to be deprecated, they are
only public because they need to be accessed by other Streams classes not
siting in the same package, or because they are used by DSL and hence
should not be exposed to users. If you observe that there are indeed some
usage patterns that are being disabled because of this proposal, please let
us know.


Guozhang



On Tue, Feb 21, 2017 at 11:32 AM, Jan Filipiak 
wrote:

> Hi,
>
> yeah if the proposed solution is doable (only constrain really is to not
> have a parent key with lots of children) completly in the DSL except the
> lateral view
> wich is a pretty easy thing in PAPI.
>
> Our own implementation is a mix of reusing DSL interfaces but using
> reflection against KTableImpl to drop down to PAPI. Probably one limiting
> factor why i am not that eager to share publicly, cause its kinda ugly. The
> development at the moment (removing many featueres from PAPI) is very
> worrisome for me, so I should get moving having upstream support.
>
> regarding the output key, we forced the user to pick a combined key
> parent+child_id, this works out pretty nicely as you get access to the
> partition information in the partitioner also in the delete cases + on the
> recieving side you can use just a regular KTableSource to materialze and
> have the parent key as prefix automatically. + It will do the naturally
> correct thing if you update parent_id in the child table. Upstream support
> would also be helpfull as the statestores are changelog even though we can
> use the intermediate topic for state store high availability.
>
> Best Jan
>
>
> On 21.02.2017 20:15, Guozhang Wang wrote:
>
>> Jan,
>>
>> Sure I would love to hear what you did for non-key joins. Last time we
>> chatted there are discussions on the ordering issue, that we HAVE TO
>> augment the join result stream keys as a combo of both, which may not be
>> elegant as used in the DSL.
>>
>> For your proposed solution, it seems you did not do that on the DSL but at
>> the PAPI layer, right?
>>
>> Guozhang
>>
>> On Tue, Feb 21, 2017 at 6:05 AM, Jan Filipiak 
>> wrote:
>>
>> Just a little note here:
>>>
>>> if you can take all rows of the "children" table for each key into
>>> memory,
>>> you get get away by using group_by and make a list of them. With this
>>> aggregation the join is straight forward and you can use a lateral view
>>> later to get to the same result. For this you could use the current DSL
>>> to
>>> a greater extend.
>>>
>>> Best Jan
>>>
>>> On 21.02.2017 13:10, Frank Lyaruu wrote:
>>>
>>> I've read that JIRA (although I don't understand every single thing), and
 I
 got the feeling it is not exactly the same problem.
 I am aware of the Global Tables, and I've tried that first, but I seem
 unable to do what I need to do.

 I'm replicating a relational database, and on a one-to-many relationship
 I'd like to publish a joined message if either of the source streams
 receives an update.

 In the Global Table Wiki:
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+
 Add+Global+Tables+to+Kafka+Streams

 I see this:
 "The GlobalKTable will only be used for doing lookups. That is, data
 arriving in the GlobalKTable will not trigger the join. "

 So how would I go about doing this?
 regards, Frank



 On Tue, Feb 21, 2017 at 10:38 AM, Eno Thereska 
 wrote:

 Hi Frank,

> As far as I know the design in that wiki has been superceded by the
> Global
> KTables design which is now coming in 0.10.2. Hence, the JIRAs that are
> mentioned there (like KAFKA-3705). There are some extensive comments in
> https://issues.apache.org/jira/browse/KAFKA-3705 <
> https://issues.apache.org/jira/browse/KAFKA-3705> illustrating why
> this
> design is particularly challenging and why Global KTables was chosen
> instead. I'm not sure if you still want to pursue that original design,
> since it is not proven to work.
>
> Guozhang, perhaps we need to add a note saying that Global KTables is
> the
> new design?
>
> Thanks
> Eno
>
> On 21 Feb 2017, at 07:35, Frank Lyaruu  wrote:
>
>> Hi all,
>>
>> I'm trying to implement joining two Kafka tables using a 'remote' key,
>> basically as 

Re: Implementing a non-key in Kafka Streams using the Processor API

2017-02-21 Thread Jan Filipiak

Hi,

yeah if the proposed solution is doable (only constrain really is to not 
have a parent key with lots of children) completly in the DSL except the 
lateral view

wich is a pretty easy thing in PAPI.

Our own implementation is a mix of reusing DSL interfaces but using 
reflection against KTableImpl to drop down to PAPI. Probably one 
limiting factor why i am not that eager to share publicly, cause its 
kinda ugly. The development at the moment (removing many featueres from 
PAPI) is very worrisome for me, so I should get moving having upstream 
support.


regarding the output key, we forced the user to pick a combined key 
parent+child_id, this works out pretty nicely as you get access to the 
partition information in the partitioner also in the delete cases + on 
the recieving side you can use just a regular KTableSource to materialze 
and have the parent key as prefix automatically. + It will do the 
naturally correct thing if you update parent_id in the child table. 
Upstream support would also be helpfull as the statestores are changelog 
even though we can use the intermediate topic for state store high 
availability.


Best Jan

On 21.02.2017 20:15, Guozhang Wang wrote:

Jan,

Sure I would love to hear what you did for non-key joins. Last time we
chatted there are discussions on the ordering issue, that we HAVE TO
augment the join result stream keys as a combo of both, which may not be
elegant as used in the DSL.

For your proposed solution, it seems you did not do that on the DSL but at
the PAPI layer, right?

Guozhang

On Tue, Feb 21, 2017 at 6:05 AM, Jan Filipiak 
wrote:


Just a little note here:

if you can take all rows of the "children" table for each key into memory,
you get get away by using group_by and make a list of them. With this
aggregation the join is straight forward and you can use a lateral view
later to get to the same result. For this you could use the current DSL to
a greater extend.

Best Jan

On 21.02.2017 13:10, Frank Lyaruu wrote:


I've read that JIRA (although I don't understand every single thing), and
I
got the feeling it is not exactly the same problem.
I am aware of the Global Tables, and I've tried that first, but I seem
unable to do what I need to do.

I'm replicating a relational database, and on a one-to-many relationship
I'd like to publish a joined message if either of the source streams
receives an update.

In the Global Table Wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+
Add+Global+Tables+to+Kafka+Streams

I see this:
"The GlobalKTable will only be used for doing lookups. That is, data
arriving in the GlobalKTable will not trigger the join. "

So how would I go about doing this?
regards, Frank



On Tue, Feb 21, 2017 at 10:38 AM, Eno Thereska 
wrote:

Hi Frank,

As far as I know the design in that wiki has been superceded by the
Global
KTables design which is now coming in 0.10.2. Hence, the JIRAs that are
mentioned there (like KAFKA-3705). There are some extensive comments in
https://issues.apache.org/jira/browse/KAFKA-3705 <
https://issues.apache.org/jira/browse/KAFKA-3705> illustrating why this
design is particularly challenging and why Global KTables was chosen
instead. I'm not sure if you still want to pursue that original design,
since it is not proven to work.

Guozhang, perhaps we need to add a note saying that Global KTables is the
new design?

Thanks
Eno

On 21 Feb 2017, at 07:35, Frank Lyaruu  wrote:

Hi all,

I'm trying to implement joining two Kafka tables using a 'remote' key,
basically as described here:

https://cwiki.apache.org/confluence/display/KAFKA/


Discussion%3A+Non-key+KTable-KTable+Joins


Under the "Implementation Details" there is one line I don't know how to
do:


1. First of all, we will repartition this KTable's stream, by key
computed from the *mapper(K, V) → K1*, so that it is co-partitioned
by
the same key. The co-partition topic is partitioned on the new key,


but the


message key and value are unchanged, and log compaction is turned
off.


How do I do that? I've been unable to find any documentation, I've
looked
at the StreamPartitionAssignor, that seems relevant, but I could use
some
help. Does anyone have an example?

regards, Frank









Re: Implementing a non-key in Kafka Streams using the Processor API

2017-02-21 Thread Guozhang Wang
Jan,

Sure I would love to hear what you did for non-key joins. Last time we
chatted there are discussions on the ordering issue, that we HAVE TO
augment the join result stream keys as a combo of both, which may not be
elegant as used in the DSL.

For your proposed solution, it seems you did not do that on the DSL but at
the PAPI layer, right?

Guozhang

On Tue, Feb 21, 2017 at 6:05 AM, Jan Filipiak 
wrote:

> Just a little note here:
>
> if you can take all rows of the "children" table for each key into memory,
> you get get away by using group_by and make a list of them. With this
> aggregation the join is straight forward and you can use a lateral view
> later to get to the same result. For this you could use the current DSL to
> a greater extend.
>
> Best Jan
>
> On 21.02.2017 13:10, Frank Lyaruu wrote:
>
>> I've read that JIRA (although I don't understand every single thing), and
>> I
>> got the feeling it is not exactly the same problem.
>> I am aware of the Global Tables, and I've tried that first, but I seem
>> unable to do what I need to do.
>>
>> I'm replicating a relational database, and on a one-to-many relationship
>> I'd like to publish a joined message if either of the source streams
>> receives an update.
>>
>> In the Global Table Wiki:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+
>> Add+Global+Tables+to+Kafka+Streams
>>
>> I see this:
>> "The GlobalKTable will only be used for doing lookups. That is, data
>> arriving in the GlobalKTable will not trigger the join. "
>>
>> So how would I go about doing this?
>> regards, Frank
>>
>>
>>
>> On Tue, Feb 21, 2017 at 10:38 AM, Eno Thereska 
>> wrote:
>>
>> Hi Frank,
>>>
>>> As far as I know the design in that wiki has been superceded by the
>>> Global
>>> KTables design which is now coming in 0.10.2. Hence, the JIRAs that are
>>> mentioned there (like KAFKA-3705). There are some extensive comments in
>>> https://issues.apache.org/jira/browse/KAFKA-3705 <
>>> https://issues.apache.org/jira/browse/KAFKA-3705> illustrating why this
>>> design is particularly challenging and why Global KTables was chosen
>>> instead. I'm not sure if you still want to pursue that original design,
>>> since it is not proven to work.
>>>
>>> Guozhang, perhaps we need to add a note saying that Global KTables is the
>>> new design?
>>>
>>> Thanks
>>> Eno
>>>
>>> On 21 Feb 2017, at 07:35, Frank Lyaruu  wrote:

 Hi all,

 I'm trying to implement joining two Kafka tables using a 'remote' key,
 basically as described here:

 https://cwiki.apache.org/confluence/display/KAFKA/

>>> Discussion%3A+Non-key+KTable-KTable+Joins
>>>
 Under the "Implementation Details" there is one line I don't know how to
 do:


1. First of all, we will repartition this KTable's stream, by key
computed from the *mapper(K, V) → K1*, so that it is co-partitioned
 by
the same key. The co-partition topic is partitioned on the new key,

>>> but the
>>>
message key and value are unchanged, and log compaction is turned
 off.


 How do I do that? I've been unable to find any documentation, I've
 looked
 at the StreamPartitionAssignor, that seems relevant, but I could use
 some
 help. Does anyone have an example?

 regards, Frank

>>>
>>>
>


-- 
-- Guozhang


Re: Heartbeats while consuming a message in kafka-python

2017-02-21 Thread Guozhang Wang
Hi Martin,

Since 0.10.1 KIP-62 has been added to consumer client, so that the user
does not need to manually call pause / resume.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-
62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread

As for python client, as far as I know this background thread approach has
also been adopted in the Confluent's open source kafka-python client as
well:

https://github.com/confluentinc/confluent-kafka-python

So that as long as you are willing to set a `session.timeout.ms` high
enough to cover the maximum processing latency of a single record, the
background thread will be responsible for sending heartbeats and hence
users do not need to worry about them.

Guozhang


On Tue, Feb 21, 2017 at 7:24 AM, Martin Sucha  wrote:

> Hello,
>
> I'm processing messages using kafka-python from a single topic, but
> ocassionally I have a message that might take a while to process, so I'd
> like to send heartbeats while processing the message.
>
> My program works something like this:
>
> consumer = KafkaConsumer(...)
> for message in consumer:
> if should_store(message.value):
> store_state(message.value)
> elif should_process(message.value):
> # This loop might take a while (longer than heartbeat interval)
> for value in stored_state(message.value):
> do_something(value)
>
> I think I need to call consumer.client.poll() at regular intervals to send
> the heartbeats, so the code would look like this:
>
> consumer = KafkaConsumer(...)
> for message in consumer:
> if should_store(message.value):
> store_state(message.value)
> elif should_process(message.value):
> # This loop might take a while (longer that heartbeat interval)
> for index, value in enumerate(stored_state(message.value)):
> do_something(value)
> if index % 1 == 0:
> consumer.client.poll()
>
> Is calling KafkaClient.poll() like this safe to do? The documentation for
> KafkaConsumer.poll() says it is incompatible with the iterator interface
> but nothing like that is in KafkaClient.poll() documentation.
>
> Also, there is KafkaConsumer.pause(). Do I need to pause the partitions I'm
> fetching from before calling consumer.client.poll()? Based on what I have
> seen in the code it looks like calling pause() will discard the buffered
> messages fetched for that partition so far and then fetch them again when
> calling resume(). Is that correct? In this case I'd rather not call pause()
> if it is not necessary.
>
> Thanks for clarification.
>
> Best Regards,
> Martin
>



-- 
-- Guozhang


Re: hitting the throughput limit on a cluster?

2017-02-21 Thread Todd Palino
So I think the important thing to look at here is the IO wait on your
system. You’re hitting disk throughput issues, and that’s what you most
likely need to resolve. So just from what you’ve described, I think the
only thing that is going to get you more performance is more spindles (or
faster spindles). This is either more disks or more brokers, but at the end
of it you need to eliminate the disk IO bottleneck.

-Todd


On Tue, Feb 21, 2017 at 7:29 AM, Jon Yeargers 
wrote:

> Running 3x 8core on google compute.
>
> Topic has 16 partitions (replication factor 2) and is consumed by 16 docker
> containers on individual hosts.
>
> System seems to max out at around 4 messages / minute. Each message is
> ~12K - compressed (snappy) JSON.
>
> Recently moved from 12 to the above 16 partitions with no change in
> throughput.
>
> Also tried increased the consumption capacity on each container by 50%. No
> effect.
>
> Network is running at ~6Gb/sec (measured using iperf3). Broker load is
> ~1.5. IOWait % is 5-10 (via sar).
>
> What are my options for adding throughput?
>
> - more brokers?
> - avro/protobuf messaging?
> - more disks / broker? (1 / host presently)
> - jumbo frames?
>
> (transparent huge pages is disabled)
>
>
> Looking at this article (
> https://engineering.linkedin.com/kafka/benchmarking-apache-
> kafka-2-million-writes-second-three-cheap-machines)
> it would appear that for our message size we are at the max. This would
> argue that we need to shrink the message size - so perhaps switching to
> avro is the next step?
>



-- 
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: JMX metrics for replica lag time

2017-02-21 Thread Guozhang Wang
You can find them in https://kafka.apache.org/documentation/#monitoring

I think this is the one you are looking for:

Lag in messages per follower replica
kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)
lag
should be proportional to the maximum batch size of a produce request.

On Mon, Feb 20, 2017 at 5:43 PM, Jun Ma  wrote:

> Hi Guozhang,
>
> Thanks for your replay. Could you tell me which one indicates the lag
> between follower and leader for a specific partition?
>
> Thanks,
> Jun
>
> On Mon, Feb 20, 2017 at 4:57 PM, Guozhang Wang  wrote:
>
> > I don't think the metrics have been changed in 0.9.0.1, in fact even in
> > 0.10.x they are still the same as stated in:
> >
> > https://kafka.apache.org/documentation/#monitoring
> >
> > The mechanism for determine which followers have been dropped out of ISR
> > has changed, but the metrics are not.
> >
> >
> > Guozhang
> >
> >
> > On Sun, Feb 19, 2017 at 7:56 PM, Jun MA  wrote:
> >
> > > Hi,
> > >
> > > I’m looking for the JMX metrics to represent replica lag time for
> > 0.9.0.1.
> > > Base on the documentation, I can only find kafka.server:type=
> > > ReplicaFetcherManager,name=MaxLag,clientId=Replica, which is max lag
> in
> > > messages btw follower and leader replicas. But since in 0.9.0.1 lag in
> > > messages is deprecated and replaced with lag time, I’m wondering what
> is
> > > the corresponding metrics for this?
> > >
> > > Thanks,
> > > Jun
> >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Kafka colapsed due to OutOfMemory

2017-02-21 Thread Fernando Bugni
Hi,

I have a 3 node kafka in production with 1gb in each JVM. Suddenly one day,
two of them when down and the other tried to make all the work. The log
errors were that they could not sincronize. We applied rebalance to them
and it never ends. Unfortunately, we have not GC log, so we had to add it
and then realized than each JVM was out of memory.

We changed specifically the line 29 of kafka-server-start.sh to add 4 gb in
each one and everything went perfect again. So we think that could be a
good idea add a comment in this script explaining that the memory could be
neccesary incremented or/and add a variable to set the memory easily. Also,
then we realized that in LinkedIn it is used setting 6gb in
https://kafka.apache.org/documentation/#java.

Perhaps it is something that could be a good idea to take into
consideration for production environments and for new users...

Regards,

-- 
Fernando Bugni


hitting the throughput limit on a cluster?

2017-02-21 Thread Jon Yeargers
Running 3x 8core on google compute.

Topic has 16 partitions (replication factor 2) and is consumed by 16 docker
containers on individual hosts.

System seems to max out at around 4 messages / minute. Each message is
~12K - compressed (snappy) JSON.

Recently moved from 12 to the above 16 partitions with no change in
throughput.

Also tried increased the consumption capacity on each container by 50%. No
effect.

Network is running at ~6Gb/sec (measured using iperf3). Broker load is
~1.5. IOWait % is 5-10 (via sar).

What are my options for adding throughput?

- more brokers?
- avro/protobuf messaging?
- more disks / broker? (1 / host presently)
- jumbo frames?

(transparent huge pages is disabled)


Looking at this article (
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines)
it would appear that for our message size we are at the max. This would
argue that we need to shrink the message size - so perhaps switching to
avro is the next step?


Heartbeats while consuming a message in kafka-python

2017-02-21 Thread Martin Sucha
Hello,

I'm processing messages using kafka-python from a single topic, but
ocassionally I have a message that might take a while to process, so I'd
like to send heartbeats while processing the message.

My program works something like this:

consumer = KafkaConsumer(...)
for message in consumer:
if should_store(message.value):
store_state(message.value)
elif should_process(message.value):
# This loop might take a while (longer than heartbeat interval)
for value in stored_state(message.value):
do_something(value)

I think I need to call consumer.client.poll() at regular intervals to send
the heartbeats, so the code would look like this:

consumer = KafkaConsumer(...)
for message in consumer:
if should_store(message.value):
store_state(message.value)
elif should_process(message.value):
# This loop might take a while (longer that heartbeat interval)
for index, value in enumerate(stored_state(message.value)):
do_something(value)
if index % 1 == 0:
consumer.client.poll()

Is calling KafkaClient.poll() like this safe to do? The documentation for
KafkaConsumer.poll() says it is incompatible with the iterator interface
but nothing like that is in KafkaClient.poll() documentation.

Also, there is KafkaConsumer.pause(). Do I need to pause the partitions I'm
fetching from before calling consumer.client.poll()? Based on what I have
seen in the code it looks like calling pause() will discard the buffered
messages fetched for that partition so far and then fetch them again when
calling resume(). Is that correct? In this case I'd rather not call pause()
if it is not necessary.

Thanks for clarification.

Best Regards,
Martin


Re: Implementing a non-key in Kafka Streams using the Processor API

2017-02-21 Thread Jan Filipiak

Just a little note here:

if you can take all rows of the "children" table for each key into 
memory, you get get away by using group_by and make a list of them. With 
this aggregation the join is straight forward and you can use a lateral 
view later to get to the same result. For this you could use the current 
DSL to a greater extend.


Best Jan

On 21.02.2017 13:10, Frank Lyaruu wrote:

I've read that JIRA (although I don't understand every single thing), and I
got the feeling it is not exactly the same problem.
I am aware of the Global Tables, and I've tried that first, but I seem
unable to do what I need to do.

I'm replicating a relational database, and on a one-to-many relationship
I'd like to publish a joined message if either of the source streams
receives an update.

In the Global Table Wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams

I see this:
"The GlobalKTable will only be used for doing lookups. That is, data
arriving in the GlobalKTable will not trigger the join. "

So how would I go about doing this?
regards, Frank



On Tue, Feb 21, 2017 at 10:38 AM, Eno Thereska 
wrote:


Hi Frank,

As far as I know the design in that wiki has been superceded by the Global
KTables design which is now coming in 0.10.2. Hence, the JIRAs that are
mentioned there (like KAFKA-3705). There are some extensive comments in
https://issues.apache.org/jira/browse/KAFKA-3705 <
https://issues.apache.org/jira/browse/KAFKA-3705> illustrating why this
design is particularly challenging and why Global KTables was chosen
instead. I'm not sure if you still want to pursue that original design,
since it is not proven to work.

Guozhang, perhaps we need to add a note saying that Global KTables is the
new design?

Thanks
Eno


On 21 Feb 2017, at 07:35, Frank Lyaruu  wrote:

Hi all,

I'm trying to implement joining two Kafka tables using a 'remote' key,
basically as described here:

https://cwiki.apache.org/confluence/display/KAFKA/

Discussion%3A+Non-key+KTable-KTable+Joins

Under the "Implementation Details" there is one line I don't know how to
do:


   1. First of all, we will repartition this KTable's stream, by key
   computed from the *mapper(K, V) → K1*, so that it is co-partitioned by
   the same key. The co-partition topic is partitioned on the new key,

but the

   message key and value are unchanged, and log compaction is turned off.


How do I do that? I've been unable to find any documentation, I've looked
at the StreamPartitionAssignor, that seems relevant, but I could use some
help. Does anyone have an example?

regards, Frank






Re: Implementing a non-key in Kafka Streams using the Processor API

2017-02-21 Thread Jan Filipiak

Hi,

yes the ticket is exactly about what you want to do. The lengthy 
discussion is mainly about what the key of the output KTable is.


@gouzhang would you be interested in seeing what we did so far?

best Jan

On 21.02.2017 13:10, Frank Lyaruu wrote:

I've read that JIRA (although I don't understand every single thing), and I
got the feeling it is not exactly the same problem.
I am aware of the Global Tables, and I've tried that first, but I seem
unable to do what I need to do.

I'm replicating a relational database, and on a one-to-many relationship
I'd like to publish a joined message if either of the source streams
receives an update.

In the Global Table Wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams

I see this:
"The GlobalKTable will only be used for doing lookups. That is, data
arriving in the GlobalKTable will not trigger the join. "

So how would I go about doing this?
regards, Frank



On Tue, Feb 21, 2017 at 10:38 AM, Eno Thereska 
wrote:


Hi Frank,

As far as I know the design in that wiki has been superceded by the Global
KTables design which is now coming in 0.10.2. Hence, the JIRAs that are
mentioned there (like KAFKA-3705). There are some extensive comments in
https://issues.apache.org/jira/browse/KAFKA-3705 <
https://issues.apache.org/jira/browse/KAFKA-3705> illustrating why this
design is particularly challenging and why Global KTables was chosen
instead. I'm not sure if you still want to pursue that original design,
since it is not proven to work.

Guozhang, perhaps we need to add a note saying that Global KTables is the
new design?

Thanks
Eno


On 21 Feb 2017, at 07:35, Frank Lyaruu  wrote:

Hi all,

I'm trying to implement joining two Kafka tables using a 'remote' key,
basically as described here:

https://cwiki.apache.org/confluence/display/KAFKA/

Discussion%3A+Non-key+KTable-KTable+Joins

Under the "Implementation Details" there is one line I don't know how to
do:


   1. First of all, we will repartition this KTable's stream, by key
   computed from the *mapper(K, V) → K1*, so that it is co-partitioned by
   the same key. The co-partition topic is partitioned on the new key,

but the

   message key and value are unchanged, and log compaction is turned off.


How do I do that? I've been unable to find any documentation, I've looked
at the StreamPartitionAssignor, that seems relevant, but I could use some
help. Does anyone have an example?

regards, Frank






Re: Implementing a non-key in Kafka Streams using the Processor API

2017-02-21 Thread Frank Lyaruu
I've read that JIRA (although I don't understand every single thing), and I
got the feeling it is not exactly the same problem.
I am aware of the Global Tables, and I've tried that first, but I seem
unable to do what I need to do.

I'm replicating a relational database, and on a one-to-many relationship
I'd like to publish a joined message if either of the source streams
receives an update.

In the Global Table Wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams

I see this:
"The GlobalKTable will only be used for doing lookups. That is, data
arriving in the GlobalKTable will not trigger the join. "

So how would I go about doing this?
regards, Frank



On Tue, Feb 21, 2017 at 10:38 AM, Eno Thereska 
wrote:

> Hi Frank,
>
> As far as I know the design in that wiki has been superceded by the Global
> KTables design which is now coming in 0.10.2. Hence, the JIRAs that are
> mentioned there (like KAFKA-3705). There are some extensive comments in
> https://issues.apache.org/jira/browse/KAFKA-3705 <
> https://issues.apache.org/jira/browse/KAFKA-3705> illustrating why this
> design is particularly challenging and why Global KTables was chosen
> instead. I'm not sure if you still want to pursue that original design,
> since it is not proven to work.
>
> Guozhang, perhaps we need to add a note saying that Global KTables is the
> new design?
>
> Thanks
> Eno
>
> > On 21 Feb 2017, at 07:35, Frank Lyaruu  wrote:
> >
> > Hi all,
> >
> > I'm trying to implement joining two Kafka tables using a 'remote' key,
> > basically as described here:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/
> Discussion%3A+Non-key+KTable-KTable+Joins
> >
> > Under the "Implementation Details" there is one line I don't know how to
> > do:
> >
> >
> >   1. First of all, we will repartition this KTable's stream, by key
> >   computed from the *mapper(K, V) → K1*, so that it is co-partitioned by
> >   the same key. The co-partition topic is partitioned on the new key,
> but the
> >   message key and value are unchanged, and log compaction is turned off.
> >
> >
> > How do I do that? I've been unable to find any documentation, I've looked
> > at the StreamPartitionAssignor, that seems relevant, but I could use some
> > help. Does anyone have an example?
> >
> > regards, Frank
>
>


Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-21 Thread Jorge Esteban Quilcate Otoya
Thanks for the feedback Matthias.

* 1. You're right. I'll reorder the scenarios.

* 2. Agree. I'll update the KIP.

* 3. I like it, updating to `reset-offsets`

* 4. Agree, removing the `reset-` part

* 5. Yes, 1.e option without --execute or --export will print out current
offset, and the new offset, that will be the same. The use-case of this
option is to use it in combination with --export mostly and have a current
'checkpoint' to reset later. I will add to the KIP how the output should
looks like.

* 6. Considering 4., I will update it to `--to-offset`

* 7. I like the idea to unify these options (plus, minus).
`shift-offsets-by` is a good option, but I will like some more feedback
here about the name. I will update the KIP in the meantime.

* 8. Yes, discussed in 9.

* 9. Agree. I'll love some feedback here. `topic` is already used by
`delete`, and we can add `--all-topics` to consider all topics/partitions
assigned to a group. How could we define specific topics/partitions?

* 10. Haven't thought about it, but make sense.
,, would be enough.

* 11. Agree. Solved with 10.

Also, I have a couple of changes to mention:

1. I have add a reference to the branch where I'm working on this KIP.

2. About the period scenario `--to-period`. I will change it to
`--to-duration` given that duration (
https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html) follows
this format: 'PnDTnHnMnS' and does not consider daylight saving efects.



El mar., 21 feb. 2017 a las 2:47, Matthias J. Sax ()
escribió:

> Hi,
>
> thanks for updating the KIP. Couple of follow up comments:
>
> * Nit: Why is "Reset to Earliest" and "Reset to Latest" a "reset by
> time" option -- IMHO it belongs to "reset by position"?
>
>
> * Nit: Description of "Reset to Earliest"
>
> > using Kafka Consumer's `auto.offset.reset` to `earliest`
>
> I think this is strictly speaking not correct (as auto.offset.reset only
> triggered if no valid offset is found, but this tool explicitly modified
> committed offset), and should be phrased as
>
> > using Kafka Consumer's #seekToBeginning()
>
> -> similar issue for description of "Reset to Latest"
>
>
> * Main option: rename to --reset-offsets (plural instead of singular)
>
>
> * Scenario Options: I would remove "reset" from all options, because the
> main argument "--reset-offset" says already what to do:
>
> > bin/kafka-consumer-groups.sh --reset-offset --reset-to-datetime XXX
>
> better (IMHO):
>
> > bin/kafka-consumer-groups.sh --reset-offsets --to-datetime XXX
>
>
>
> * Option 1.e ("print and export current offset") is not intuitive to use
> IMHO. The main option is "--reset-offset" but nothing happens if no
> scenario is specified. It is also not specified, what the output should
> look like?
>
> Furthermore, --describe should actually show currently committed offset
> for a group. So it seems to be redundant to have the same option in
> --reset-offsets
>
>
> * Option 2.a: I would rename to "--reset-to-offset" (or considering the
> comment above to "--to-offset")
>
>
> * Option 2.b and 2.c: I would unify to "--shift-offsets-by" (or similar)
> and accept positive/negative values
>
>
> * About Scope "all": maybe it's better to have an option "--all-topics"
> (or similar). IMHO explicit arguments are preferable over implicit
> setting to guard again accidental miss use of the tool.
>
>
> * Scope: I also think, that "--topic" (singular) and "--topics" (plural)
> are too similar and easy to use in a wrong way (ie, mix up) -- maybe we
> can have two options that are easier to distinguish.
>
>
> * I still think that JSON is not the best format (it's too verbose/hard
> to write for humans from scratch). A simple CSV format with implicit
> schema (topic,partition,offset) would be sufficient.
>
>
> * Why does the JSON contain "group_id" field -- there is parameter
> "--group" to specify the group ID. Would one overwrite the other (what
> order) or would there be an error if "--group" is used in combination
> with "--reset-from-file"?
>
>
>
> -Matthias
>
>
>
>
> On 2/17/17 6:43 AM, Jorge Esteban Quilcate Otoya wrote:
> > Hi,
> >
> > according to the feedback, I've updated the KIP:
> >
> > - We have added and ordered the scenarios, scopes and executions of the
> > Reset Offset tool.
> > - Consider it as an extension to the current `ConsumerGroupCommand` tool
> > - Execution will be possible without generating JSON files.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
> >
> > Looking forward to your feedback!
> >
> > Jorge.
> >
> > El mié., 8 feb. 2017 a las 23:23, Jorge Esteban Quilcate Otoya (<
> > quilcate.jo...@gmail.com>) escribió:
> >
> >> Great. I think I got the idea. What about this options:
> >>
> >> Scenarios:
> >>
> >> 1. Current status
> >>
> >> ´kafka-consumer-groups.sh --reset-offset --group cg1´
> >>
> >> 2. To Datetime
> >>
> >> ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to-datetime

Re: Implementing a non-key in Kafka Streams using the Processor API

2017-02-21 Thread Eno Thereska
Hi Frank,

As far as I know the design in that wiki has been superceded by the Global 
KTables design which is now coming in 0.10.2. Hence, the JIRAs that are 
mentioned there (like KAFKA-3705). There are some extensive comments in 
https://issues.apache.org/jira/browse/KAFKA-3705 
 illustrating why this design 
is particularly challenging and why Global KTables was chosen instead. I'm not 
sure if you still want to pursue that original design, since it is not proven 
to work.

Guozhang, perhaps we need to add a note saying that Global KTables is the new 
design?

Thanks
Eno

> On 21 Feb 2017, at 07:35, Frank Lyaruu  wrote:
> 
> Hi all,
> 
> I'm trying to implement joining two Kafka tables using a 'remote' key,
> basically as described here:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins
> 
> Under the "Implementation Details" there is one line I don't know how to
> do:
> 
> 
>   1. First of all, we will repartition this KTable's stream, by key
>   computed from the *mapper(K, V) → K1*, so that it is co-partitioned by
>   the same key. The co-partition topic is partitioned on the new key, but the
>   message key and value are unchanged, and log compaction is turned off.
> 
> 
> How do I do that? I've been unable to find any documentation, I've looked
> at the StreamPartitionAssignor, that seems relevant, but I could use some
> help. Does anyone have an example?
> 
> regards, Frank