Re: KIP-162: Enable topic deletion by default

2017-05-27 Thread Vahid S Hashemian
Sure, that sounds good.

I suggested that to keep command line behavior consistent.
Plus, removal of ACL access is something that can be easily undone, but 
topic deletion is not reversible.
So, perhaps a new follow-up JIRA to this KIP to add the confirmation for 
topic deletion.

Thanks.
--Vahid



From:   Gwen Shapira 
To: d...@kafka.apache.org, users@kafka.apache.org
Date:   05/27/2017 11:04 AM
Subject:Re: KIP-162: Enable topic deletion by default



Thanks Vahid,

Do you mind if we leave the command-line out of scope for this?

I can see why adding confirmations, options to bypass confirmations, etc
would be an improvement. However, I've seen no complaints about the 
current
behavior of the command-line and the KIP doesn't change it at all. So I'd
rather address things separately.

Gwen

On Fri, May 26, 2017 at 8:10 PM Vahid S Hashemian 

wrote:

> Gwen, thanks for the KIP.
> It looks good to me.
>
> Just a minor suggestion: It would be great if the command asks for a
> confirmation (y/n) before deleting the topic (similar to how removing 
ACLs
> works).
>
> Thanks.
> --Vahid
>
>
>
> From:   Gwen Shapira 
> To: "d...@kafka.apache.org" , Users
> 
> Date:   05/26/2017 07:04 AM
> Subject:KIP-162: Enable topic deletion by default
>
>
>
> Hi Kafka developers, users and friends,
>
> I've added a KIP to improve our out-of-the-box usability a bit:
> KIP-162: Enable topic deletion by default:
>
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-162+-+Enable+topic+deletion+by+default

>
>
> Pretty simple :) Discussion and feedback are welcome.
>
> Gwen
>
>
>
>
>






Re: Producer Async Issue

2017-05-27 Thread Hans Jespersen
So you just want to not block and just silently throw away the messages and 
lose them forever? Kafka's persistence is all in the broker so there is no 
client side storing of data on disk. The client will send in async mode until 
the client memory buffer is full. Only then does it block, and this is by 
design because then it's up to your app to decide to either throw the messages 
away, stop publishing, or store them somewhere outside of Kafka.

The easiest solution would be to run more than one broker so that they are 
fault tolerant and will take over for any failed broker nodes.

-hans

> On May 27, 2017, at 12:40 PM, Abhimanyu Nagrath  
> wrote:
> 
> HI Hans,
> 
> What exactly I meant by asynchronous is that when my Kafka broker is down
> and I am trying to produce the message . It is getting stuck till the
> configured max.block.ms and after that further code is executed. What I am
> looking for is that whether the broker is down or not it should not get
> stuck.
> 
> 
> 
> Regards,
> Abhimanyu
> 
>> On Sat, May 27, 2017 at 10:30 PM, Hans Jespersen  wrote:
>> 
>> The producer is asynchronous (assuming you mean the Java Producer)
>> 
>> https://kafka.apache.org/0102/javadoc/index.html?org/apache/
>> kafka/clients/producer/KafkaProducer.html
>> 
>> -hans
>> 
>>> On May 27, 2017, at 5:15 AM, Abhimanyu Nagrath <
>> abhimanyunagr...@gmail.com> wrote:
>>> 
>>> Hi,
>>> I am using Kafka 0.10.2 single node cluster and I want to know
>>> Kafka producer completely asynchronous. So what configuration I need to
>>> change in order to make producer completely asynchronous.
>>> 
>>> 
>>> 
>>> Regards,
>>> Abhimanyu
>> 


Re: Efficient way of Searching Messages By Timestamp - Kafka

2017-05-27 Thread SenthilKumar K
Hi Team , Any help here Pls ?

Cheers,
Senthil

On Sat, May 27, 2017 at 8:25 PM, SenthilKumar K 
wrote:

> Hello Kafka Developers , Users ,
>
> We are exploring the SearchMessageByTimestamp feature in Kafka for our
> use case .
>
> Use Case : Kafka will be realtime message bus , users should be able
> to pull Logs by specifying start_date and end_date or  Pull me last five
> minutes data etc ...
>
> I did POC on SearchMessageByTimestamp , here is the code
> https://gist.github.com/senthilec566/16e8e28b32834666fea132afc3a4e2f9 .
> And i observed that Searching Messages is slow ..
>
> Here is small test i did :
> Query :Fetch Logs of Last *5 minutes*:
> Result:
> No of Records fetched : *30*
> Fetch Time *6210* ms
>
> Above test performed in a topic which has 4 partitions. In each partition
> search & query processing happened .. in other words
> consumer.offsetsForTimes()
> consumer.assign(Arrays.asList(partition))
> consumer.seek(this.partition, offsetTimestamp.offset())
> consumer.poll(100)
>
> are the API calls of each partition.. I realized that , this was the
> reason for Kafka taking more time..
>
> What is efficient way of implementing SerachMessageByTimeStamp ?  Is Kafka
> right candidate for our Use Case ?
>
> Pls add your thoughts here ...
>
>
> Cheers,
> Senthil
>


Re: Kafka Streams Usage Patterns

2017-05-27 Thread Jay Kreps
This is great!

-Jay

On Sat, May 27, 2017 at 12:47 PM, Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> Hi all,
>
> I've updated the wiki page with a draft pattern for consecutively growing
> time-windowed aggregations which was discussed some time ago on this
> mailing list.
>
> I'm yet to add the part that cleans up the stores using punctuations. Stay
> tuned.
>
>
> On a somewhat similar subject, I've been working to implement the
> following requirements:
>
> * transaction sums per customer session (simple, just extract non-expired
> session-windowed aggregates from a SessionStore using interactive queries)
>
> * global transaction sums for all *currently active* customer sessions
>
> The second bit proved non-trivial, because session-windowed KTables (or
> any windowed KTables for that matter) don't notify downstream when a window
> expires. And I can't use punctuate until KIP-138 is implemented because
> stream time punctuation is no good in this case (records can stop coming),
> reliable system time punctuation would be needed.
>
> Below is how I implemented this, I'm yet to test it thoroughly.
>
> I wonder if anyone knows of an easier way of achieving the same.
>
> If so, I'm looking forward to suggestions. If not, I'll add that to the
> patterns wiki page too, in case someone else finds it useful.
>
>
> builder
>   .stream(/*key serde*/, /*transaction serde*/, "transaciton-topic")
>
>   .groupByKey(/*key serde*/, /*transaction serde*/)
>
>   .aggregate(
> () -> /*empty aggregate*/,
> aggregator(),
> merger(),
> SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2),
> /* aggregate serde */,
> txPerCustomerSumStore() // this store can be queried for per customer 
> session data  )
>
>   .toStream()
>
>   .filter(((key, value) -> value != null)) // tombstones only come when a 
> session is merged into a bigger session, so ignore them
>// the below map/groupByKey/reduce operations are to only propagate 
> updates to the *latest* session per customer to downstream
>
>   .map((windowedCustomerId, agg) -> // this moves timestamp from the windowed 
> key into the value  
> // so that we can group by customerId only and reduce to the later value
> new KeyValue<>(
>   windowedCustomerId.key(), // just customerId  new WindowedAggsImpl( 
> // this is just like a tuple2 but with nicely named accessors: timestamp() 
> and aggs()
> windowedCustomerId.window().end(),
> agg
>   )
> )
>   )
>   .groupByKey( /*key serde*/, /*windowed aggs serde*/ ) // key is just 
> customerId  .reduce( // take later session value and ignore any older - 
> downstream only cares about *current* sessions(val, agg) -> 
> val.timestamp() > agg.timestamp() ? val : agg,
> 
> TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS),
> "latest-session-windowed"  )
>
>   .groupBy((windowedCustomerId, timeAndAggs) -> // calculate totals with 
> maximum granularity, which is per-partitionnew KeyValue<>(
>   new Windowed<>(
> windowedCustomerId.key().hashCode() % PARTITION_COUNT_FOR_TOTALS,  // 
> KIP-159 would come in handy here, to access partition number instead
> windowedCustomerId.window() // will use this in the interactive 
> queries to pick the oldest not-yet-expired window
>   ),
>   timeAndAggs.aggs()
> ),
> new SessionKeySerde<>(Serdes.Integer()),/* aggregate serde */
>   )
>
>   .reduce(
> (val, agg) -> agg.add(val),
> (val, agg) -> agg.subtract(val),
> txTotalsStore() // this store can be queried to get totals per partition 
> for all active sessions  );
>
> builder.globalTable(
>   new SessionKeySerde<>(Serdes.Integer()),
>   /* aggregate serde */,
>   changelogTopicForStore(TRANSACTION_TOTALS), "totals");// this global table 
> puts per partition totals on every node, so that they can be easily summed 
> for global totals, picking the oldest not-yet-expired window
>
> TODO: put in StreamParitioners (with KTable.through variants added in
> KAFKA-5045) to avoid re-partitioning where I know it's unnecessary.
>
> The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I want to
> first do summation with max parallelism and minimize the work needed
> downstream. So I calculate a per-partition sum first to limit the updates
> that the totals topic will receive and the summing work done by the
> interactive queries on the global store. Is this a good way of going about
> it?
>
> Thanks,
>
> Michał
>
> On 09/05/17 18:31, Matthias J. Sax wrote:
>
> Hi,
>
> I started a new Wiki page to collect some common usage patterns for
> Kafka Streams.
>
> Right now, it contains a quick example on "how to compute average". Hope
> we can collect more example like this!
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns
>
>
> -Matthias
>
>
>
> --
>  Michal Borowi

Re: Kafka Streams Usage Patterns

2017-05-27 Thread Michal Borowiecki

Hi all,

I've updated the wiki page with a draft pattern for consecutively 
growing time-windowed aggregations which was discussed some time ago on 
this mailing list.


I'm yet to add the part that cleans up the stores using punctuations. 
Stay tuned.



On a somewhat similar subject, I've been working to implement the 
following requirements:


* transaction sums per customer session (simple, just extract 
non-expired session-windowed aggregates from a SessionStore using 
interactive queries)


* global transaction sums for all _/currently active/_ customer sessions

The second bit proved non-trivial, because session-windowed KTables (or 
any windowed KTables for that matter) don't notify downstream when a 
window expires. And I can't use punctuate until KIP-138 is implemented 
because stream time punctuation is no good in this case (records can 
stop coming), reliable system time punctuation would be needed.


Below is how I implemented this, I'm yet to test it thoroughly.

I wonder if anyone knows of an easier way of achieving the same.

If so, I'm looking forward to suggestions. If not, I'll add that to the 
patterns wiki page too, in case someone else finds it useful.



builder
  .stream(/*key serde*/, /*transaction serde*/,"transaciton-topic")

  .groupByKey(/*key serde*/, /*transaction serde*/)

  .aggregate(
() -> /*empty aggregate*/,
aggregator(),
merger(),
SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2),
/* aggregate serde */,
txPerCustomerSumStore()// this store can be queried for per customer 
session data )

  .toStream()

  .filter(((key, value) -> value !=null))// tombstones only come when a session is merged into a bigger session, 
so ignore them


// the below map/groupByKey/reduce operations are to only propagate 
updates to the _latest_ session per customer to downstream


  .map((windowedCustomerId, agg) ->// this moves timestamp from the windowed key into the value // so that 
we can group by customerId only and reduce to the later value new KeyValue<>(
  windowedCustomerId.key(),// just customerId new WindowedAggsImpl(// this is just like a tuple2 but with nicely named accessors: 
timestamp() and aggs()

windowedCustomerId.window().end(),
agg
  )
)
  )
  .groupByKey( /*key serde*/, /*windowed aggs serde*/ )// key is just customerId .reduce(// take later session value and ignore any older - downstream only cares 
about _current_ sessions (val, agg) -> val.timestamp() > agg.timestamp() ? val : agg,


TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS),
"latest-session-windowed" )

  .groupBy((windowedCustomerId, timeAndAggs) ->// calculate totals with maximum 
granularity, which is per-partition new KeyValue<>(
  new Windowed<>(
windowedCustomerId.key().hashCode() %PARTITION_COUNT_FOR_TOTALS,// 
KIP-159 would come in handy here, to access partition number instead
windowedCustomerId.window()// will use this in the interactive queries to pick the oldest 
not-yet-expired window

  ),
  timeAndAggs.aggs()
),
new SessionKeySerde<>(Serdes.Integer()),
/* aggregate serde */
  )

  .reduce(
(val, agg) -> agg.add(val),
(val, agg) -> agg.subtract(val),
txTotalsStore()// this store can be queried to get totals per partition for all active 
sessions );


builder.globalTable(
  new SessionKeySerde<>(Serdes.Integer()),
  /* aggregate serde */,
  changelogTopicForStore(TRANSACTION_TOTALS),"totals");
// this global table puts per partition totals on every node, so that 
they can be easily summed for global totals, picking the oldest 
not-yet-expired window


TODO: put in StreamParitioners (with KTable.through variants added in 
KAFKA-5045) to avoid re-partitioning where I know it's unnecessary.


The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I want to 
first do summation with max parallelism and minimize the work needed 
downstream. So I calculate a per-partition sum first to limit the 
updates that the totals topic will receive and the summing work done by 
the interactive queries on the global store. Is this a good way of going 
about it?


Thanks,

Michał


On 09/05/17 18:31, Matthias J. Sax wrote:

Hi,

I started a new Wiki page to collect some common usage patterns for
Kafka Streams.

Right now, it contains a quick example on "how to compute average". Hope
we can collect more example like this!

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns


-Matthias



--
Signature
 Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com
W:  www.openbet.com 


OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK




Re: Producer Async Issue

2017-05-27 Thread Abhimanyu Nagrath
HI Hans,

What exactly I meant by asynchronous is that when my Kafka broker is down
and I am trying to produce the message . It is getting stuck till the
configured max.block.ms and after that further code is executed. What I am
looking for is that whether the broker is down or not it should not get
stuck.



Regards,
Abhimanyu

On Sat, May 27, 2017 at 10:30 PM, Hans Jespersen  wrote:

> The producer is asynchronous (assuming you mean the Java Producer)
>
> https://kafka.apache.org/0102/javadoc/index.html?org/apache/
> kafka/clients/producer/KafkaProducer.html
>
> -hans
>
> > On May 27, 2017, at 5:15 AM, Abhimanyu Nagrath <
> abhimanyunagr...@gmail.com> wrote:
> >
> > Hi,
> > I am using Kafka 0.10.2 single node cluster and I want to know
> > Kafka producer completely asynchronous. So what configuration I need to
> > change in order to make producer completely asynchronous.
> >
> >
> >
> > Regards,
> > Abhimanyu
>


Re: KIP-162: Enable topic deletion by default

2017-05-27 Thread Gwen Shapira
Thanks Vahid,

Do you mind if we leave the command-line out of scope for this?

I can see why adding confirmations, options to bypass confirmations, etc
would be an improvement. However, I've seen no complaints about the current
behavior of the command-line and the KIP doesn't change it at all. So I'd
rather address things separately.

Gwen

On Fri, May 26, 2017 at 8:10 PM Vahid S Hashemian 
wrote:

> Gwen, thanks for the KIP.
> It looks good to me.
>
> Just a minor suggestion: It would be great if the command asks for a
> confirmation (y/n) before deleting the topic (similar to how removing ACLs
> works).
>
> Thanks.
> --Vahid
>
>
>
> From:   Gwen Shapira 
> To: "d...@kafka.apache.org" , Users
> 
> Date:   05/26/2017 07:04 AM
> Subject:KIP-162: Enable topic deletion by default
>
>
>
> Hi Kafka developers, users and friends,
>
> I've added a KIP to improve our out-of-the-box usability a bit:
> KIP-162: Enable topic deletion by default:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-162+-+Enable+topic+deletion+by+default
>
>
> Pretty simple :) Discussion and feedback are welcome.
>
> Gwen
>
>
>
>
>


Re: Producer Async Issue

2017-05-27 Thread Hans Jespersen
The producer is asynchronous (assuming you mean the Java Producer)

https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

-hans

> On May 27, 2017, at 5:15 AM, Abhimanyu Nagrath  
> wrote:
> 
> Hi,
> I am using Kafka 0.10.2 single node cluster and I want to know
> Kafka producer completely asynchronous. So what configuration I need to
> change in order to make producer completely asynchronous.
> 
> 
> 
> Regards,
> Abhimanyu


Efficient way of Searching Messages By Timestamp - Kafka

2017-05-27 Thread SenthilKumar K
Hello Kafka Developers , Users ,

We are exploring the SearchMessageByTimestamp feature in Kafka for our
use case .

Use Case : Kafka will be realtime message bus , users should be able to
pull Logs by specifying start_date and end_date or  Pull me last five
minutes data etc ...

I did POC on SearchMessageByTimestamp , here is the code
https://gist.github.com/senthilec566/16e8e28b32834666fea132afc3a4e2f9 . And
i observed that Searching Messages is slow ..

Here is small test i did :
Query :Fetch Logs of Last *5 minutes*:
Result:
No of Records fetched : *30*
Fetch Time *6210* ms

Above test performed in a topic which has 4 partitions. In each partition
search & query processing happened .. in other words
consumer.offsetsForTimes()
consumer.assign(Arrays.asList(partition))
consumer.seek(this.partition, offsetTimestamp.offset())
consumer.poll(100)

are the API calls of each partition.. I realized that , this was the reason
for Kafka taking more time..

What is efficient way of implementing SerachMessageByTimeStamp ?  Is Kafka
right candidate for our Use Case ?

Pls add your thoughts here ...


Cheers,
Senthil


Re: Trouble with querying offsets when using new consumer groups API

2017-05-27 Thread Abhimanyu Nagrath
Hi Jerry,

I am also facing the same issue. Did you found the solution?

Regards,
Abhimanyu

On Fri, May 26, 2017 at 7:24 PM, Jerry George  wrote:

> Hi
>
> I had question about the new consumer APIs.
>
> I am having trouble retrieving the offsets once the consumers are
> *disconnected* when using new consumer v2 API. Following is what I am
> trying to do,
>
> *bin/kafka-consumer-groups.sh -new-consumer --bootstrap-server kafka:9092
> --group group --describe*
>
> If I query this when the consumers are connected, there is no problem.
> However, once the consumers are disconnected it says there is no such
> group, though the offsets are retained in __consumer_offsets.
>
> The offset retention policy is default; i.e. 1440 minutes, I believe.
>
> Once the consumers are reconnected, I am able to query the offsets once
> again.
>
> Could anyone here please help me understand why this is?
>
> Kafka: 0.10.1
> Consumer Library: sarama golang library
>
> Regards,
> Jerry
>


Producer Async Issue

2017-05-27 Thread Abhimanyu Nagrath
Hi,
I am using Kafka 0.10.2 single node cluster and I want to know
Kafka producer completely asynchronous. So what configuration I need to
change in order to make producer completely asynchronous.



Regards,
Abhimanyu