Re: kstream transform forward to different topics

2019-02-13 Thread Jan Filipiak
For now, just use the name it gets automatically, or crack the
AbstractStream open with reflection ;)

307 is doing it the wrong way again, just make name accessible instead
of make the users put them :face_with_rolling_eyes:

On 08.02.2019 02:36, Guozhang Wang wrote:
> Hi Nan,
> 
> Glad it helps with your case. Just another note that in the next release
> when KIP-307 is in place [1], you can actually combine the DSL with PAPI by
> naming the last operator that creates your transformed KStream, and then
> manually add the sink nodes like:
> 
> stream2 = stream1.transform(Named.as("myName"));
> 
> topology = builder.build();
> 
> // continue adding to the built topology
> topology.addSink(... "myName");
> 
> -


Re: [ANNOUNCE] New Committer: Vahid Hashemian

2019-01-15 Thread Jan Filipiak
Congratz!

On 15.01.2019 23:44, Jason Gustafson wrote:
> Hi All,
>
> The PMC for Apache Kafka has invited Vahid Hashemian as a project committer 
> and
> we are
> pleased to announce that he has accepted!
>
> Vahid has made numerous contributions to the Kafka community over the past
> few years. He has authored 13 KIPs with core improvements to the consumer
> and the tooling around it. He has also contributed nearly 100 patches
> affecting all parts of the codebase. Additionally, Vahid puts a lot of
> effort into community engagement, helping others on the mail lists and
> sharing his experience at conferences and meetups.
>
> We appreciate the contributions and we are looking forward to more.
> Congrats Vahid!
>
> Jason, on behalf of the Apache Kafka PMC
>


Re: Doubts in Kafka

2019-01-08 Thread Jan Filipiak

On 08.01.2019 17:11, aruna ramachandran wrote:
> I need to process single sensor messages in serial (order of messages
> should not be changed)at the same time I have to process 1 sensors
> messages in parallel please help me to configure the topics and partitions.
>


If you want to process event in order, you only need to make sure that 
the messages of every sensor go into the same partition. Not every 
sensor needs partition of its own. Just use sensor_id or UUID or 
something as a key while producing. Then you can freely pick the number 
of partitions.

Then take the retention time and throughput and try to come out at ~50GB 
at rest. If you need more throughput you can still use more partitions then

best Jan



Re: Graceful Shutdown always fails on multi-broker setup (Windows)

2018-05-09 Thread Jan Filipiak

Hi,

yes, your case is the exception. In usual deployments kafka has to be 
there 100% all the time.
So as the name rolling restart suggest, you usually upgrade / do 
maitenance  on boxes (a few at a time) depending how your

topics are laied our across brokers.





On 09.05.2018 12:13, M. Manna wrote:

Thanks Jan. We have 9 broker-zookeeper setup in production and during
monthly maintenance we need to shut it down gracefully (or reasonably) to
do our work.
Are you saying that it's okay not to shut down the entire cluster?

Also, will this hold true even when we are trying to do rolling upgrade to
1.0x as prescribed here - https://kafka.apache.org/documentation/#upgrade ?

Regards,

On 9 May 2018 at 11:06, Jan Filipiak  wrote:


Hi,

  this is expected.  A gracefully shutdown means the broker is only
shutting down when it is not the leader of any partition.
Therefore you should not be able to gracefully shut down your entire
cluster.

Hope that helps

Best Jan



On 09.05.2018 12:02, M. Manna wrote:


Hello,

I have followed the graceful shutdown process by using the following (in
addition to the default controlled.shutdown.enable)

controlled.shutdown.max.retries=10
controlled.shutdown.retry.backoff.ms=3000

I am always having issues where not all the brokers are shutting
gracefully. And it's always Kafka, not zookeeper.

Has anyone experienced this discrepancy ? If so, could you please let me
know how to get around this issue?

Regards,






Re: Graceful Shutdown always fails on multi-broker setup (Windows)

2018-05-09 Thread Jan Filipiak

Hi,

 this is expected.  A gracefully shutdown means the broker is only 
shutting down when it is not the leader of any partition.
Therefore you should not be able to gracefully shut down your entire 
cluster.


Hope that helps

Best Jan


On 09.05.2018 12:02, M. Manna wrote:

Hello,

I have followed the graceful shutdown process by using the following (in
addition to the default controlled.shutdown.enable)

controlled.shutdown.max.retries=10
controlled.shutdown.retry.backoff.ms=3000

I am always having issues where not all the brokers are shutting
gracefully. And it's always Kafka, not zookeeper.

Has anyone experienced this discrepancy ? If so, could you please let me
know how to get around this issue?

Regards,





Re: Kafka Consumer Offsets unavailable during rebalancing

2018-02-13 Thread Jan Filipiak

I would encourage you todo so.
I also think its not reasonable behavior

On 13.02.2018 11:28, Wouter Bancken wrote:

We have upgraded our Kafka version as an attempt to solve this issue.
However, the issue is still present in Kafka 1.0.0.

Can I log a bug for this in JIRA?

Wouter

On 5 February 2018 at 09:22, Wouter Bancken 
wrote:


The consumers in consumer group 'X' do not have a regex subscription
matching the newly created topic 'C'. They simply subscribe with
the subscribe(java.util.Collection topics) method on
topics 'A' and 'B'.

Shouldn't the consumer group have a different state from "Stable" during a
rebalancing regardless of the cause? How else can we determine the consumer
lag of the group during the rebalancing?

Best regards,
Wouter

Have a look at our brand NEW job website: jobs.aca-it.be !


*ACA IT-Solutions NV*
*HQ:* Herkenrodesingel 8B 2.01 | 3500 Hasselt
T +32(0)11 26 50 10 | F +32(0)11 26 50 11
www.aca-it.be | Twitter  | Facebook
 |
Linkedin 

On 5 February 2018 at 00:13, Hans Jespersen  wrote:


Do the consumers in consumer group ‘X’ have a regex subscription that
matches the newly created topic ‘C’?

If they do then they will only discover this new topic once their ‘
metadata.max.age.ms’  metadata refresh interval has passed, which
defaults to 5 minutes.

metadata.max.age.ms The period of time in milliseconds after which
we force a refresh of metadata even if we haven't seen any partition
leadership changes to proactively discover any new brokers or partitions
-hans



On Feb 4, 2018, at 2:16 PM, Wouter Bancken 

wrote:

Hi Hans,

Thanks for the response!

However, I get this result for all topics, not just for the newly

created

topic.

Situation sketch:
1. I have a consumer group 'X' subscribed to topics 'A' and 'B' with
partition assignments and lag information. Consumer group 'X' is

"Stable".

2a. Topic 'C' is (being) created.
2b. During this creation, I do not have a partition assignment for

consumer

group 'X' for topics 'A' and 'B' but the consumer group is still

"Stable".

3. A second later: I have a partition assignment for consumer group 'X'

for

topics 'A' and 'B' again and the consumer group is still "Stable".

I expected the state of consumer group 'X' during step 2b to be
"PreparingRebalance" or "AwaitingSync".

Best regards,
Wouter


On 4 February 2018 at 21:25, Hans Jespersen  wrote:

I believe this is expected behavior.

If there are no subscriptions to a new topic, and therefor no partition
assignments, and definitely no committed offsets, then lag is an

undefined

concept. When the consumers subscribe to this new topic they may chose

to

start at the beginning or end of the commit log so the lag cannot be
predicted in advance.

-hans


On Feb 4, 2018, at 11:51 AM, Wouter Bancken 
wrote:

Can anyone clarify if this is a bug in Kafka or the expected behavior?

Best regards,
Wouter


On 30 January 2018 at 21:04, Wouter Bancken 
Hi,

I'm trying to write an external tool to monitor consumer lag on

Apache

Kafka.

For this purpose, I'm using the kafka-consumer-groups tool to fetch

the

consumer offsets.

When using this tool, partition assignments seem to be unavailable
temporarily during the creation of a new topic even if the consumer

group

has no subscription on this new topic. This seems to match the
documentation


saying *"Topic metadata changes which have no impact on subscriptions
cause resync"*.

However, when this occurs I'd expect the state of the consumer to be
"PreparingRebalance" or "AwaitingSync" but it is simply "Stable".

Is this a bug in the tooling or is there a different way to obtain

the

correct offsets for a consumer group during a rebalance?

I'm using Kafka 10.2.1 but I haven't found any related issues in

recent

changelogs.
Best regards,
Wouter







Re: Broker won't exit...

2018-01-10 Thread Jan Filipiak

HI

brokers still try todo a gracefull shutdown I supose?
It would only shut down if it is not the leader of any partition anymore.

Can you verify: there are other brokers alive that took over leadership?
and the broker in question stepped down as a leader for all partitions?

Best Jan



On 10.01.2018 12:57, Ted Yu wrote:

Skip:Can you pastebin the stack trace of the stuck broker ?
Thanks
 Original message From: Skip Montanaro 
 Date: 1/10/18  3:52 AM  (GMT-08:00) To: 
users@kafka.apache.org Subject: Re: Broker won't exit...
Did you stop the broker before stoping zookeeper?


Yes. My stop script executes the server stop scripts in reverse order from
my start script. Should I have stuck in a couple second sleep between
stopping the brokers and stopping zookeeper?

I was actually running two brokers. The one my stop script stopped first
exited properly.

Skip




Re: Consuming a state store (KTable) basics - 1.0.0

2017-12-07 Thread Jan Filipiak

Hi Peter,

glad it helped,

these are the preferred ways indeed.




On 07.12.2017 15:58, Peter Figliozzi wrote:

Thanks Jan, super helpful!  To summarize (I hope I've got it right), there
are only two ways for external applications to access data derived from a
KTable:

1.  Inside the streams application that builds the KTable, create a
KafkaStreams.store and expose to the outside via a service.

2.  Convert the KTable to a stream and write to a new Kafka topic.  Then
external apps can just consume this feed.  If we only care about the latest
updates, make the topic log-compacted.

latest value per key or last updated might be a different story here,
in the end there is a lot of flexibility here that everyone is free to 
explore


Best Jan



Thanks,

Pete

On Thu, Dec 7, 2017 at 1:42 AM, Jan Filipiak 
wrote:


Hi,

you should be able to retrieve your store with

https://github.com/apache/kafka/blob/trunk/streams/src/main/
java/org/apache/kafka/streams/KafkaStreams.java#L1021

This would give you access to the store from inside your current
application. In your Streams application your could then
expose this Store with a say REST or any other RPC interface, to let
applications from outside your JVM query it.

So i would say the blogpost still applies quite well.

Hope this helps

Best Jan


On 07.12.2017 04:59, Peter Figliozzi wrote:


I've written a Streams application which creates a KTable like this:

val myTable: KTable[String, GenericRecord] = myStream
  .groupByKey()
  .aggregate(myInitializer, myAdder, myStore)

where myStore was configured like this:

val myStore
  : Materialized[String, GenericRecord, KeyValueStore[Bytes,
Array[Byte]]] =
  Materialized
.as("my-store")
.withKeySerde(Serdes.String())
.withValueSerde(genericValueSerde)

What I'd like to do now is query (read) this store from a separate
application.  How do I query it in 1.0.0?  With a KTable constructor,
using
the store string as the topic, i.e.:

public  KTable table(
java.lang.String topic,
Materialized>
materialized)

Or some other way?

I saw this blog post
<https://blog.codecentric.de/en/2017/03/interactive-queries-
in-apache-kafka-streams/>
but it appears to be only applicable to the older version of Kafka (please
correct me if I'm wrong).

Thanks,

Pete






Re: Consuming a state store (KTable) basics - 1.0.0

2017-12-06 Thread Jan Filipiak

Hi,

you should be able to retrieve your store with

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1021

This would give you access to the store from inside your current 
application. In your Streams application your could then
expose this Store with a say REST or any other RPC interface, to let 
applications from outside your JVM query it.


So i would say the blogpost still applies quite well.

Hope this helps

Best Jan


On 07.12.2017 04:59, Peter Figliozzi wrote:

I've written a Streams application which creates a KTable like this:

val myTable: KTable[String, GenericRecord] = myStream
 .groupByKey()
 .aggregate(myInitializer, myAdder, myStore)

where myStore was configured like this:

val myStore
 : Materialized[String, GenericRecord, KeyValueStore[Bytes,
Array[Byte]]] =
 Materialized
   .as("my-store")
   .withKeySerde(Serdes.String())
   .withValueSerde(genericValueSerde)

What I'd like to do now is query (read) this store from a separate
application.  How do I query it in 1.0.0?  With a KTable constructor, using
the store string as the topic, i.e.:

public  KTable table(
java.lang.String topic,
Materialized>
materialized)

Or some other way?

I saw this blog post

but it appears to be only applicable to the older version of Kafka (please
correct me if I'm wrong).

Thanks,

Pete





Re: Mirrormaker consumption slowness

2017-12-06 Thread Jan Filipiak

Hi,

two questions. Is your MirrorMaker collocated with the source or the target?
what are the send and receive buffer sizes on the connections that do span
across WAN?

Hope we can get you some help.

Best jan



On 06.12.2017 14:36, Xu, Zhaohui wrote:

Any update on this issue?

We also run into similar situation recently. The mirrormaker is leveraged to 
replicate messages between clusters in different dc. But sometimes a portion of 
partitions are with high consumer lag and tcpdump also shows similar packet 
delivery pattern. The behavior is sort of weird and is not self-explaining. 
Wondering whether it has anything to do with the fact that number of consumers 
is too large?  In our example, we have around 100 consumer connections per 
broker.

Regards,
Jeff

On 12/5/17, 10:14 AM, "tao xiao"  wrote:

 Hi,
 
 any pointer will be highly appreciated
 
 On Thu, 30 Nov 2017 at 14:56 tao xiao  wrote:
 
 > Hi There,

 >
 >
 >
 > We are running into a weird situation when using Mirrormaker to replicate
 > messages between Kafka clusters across datacenter and reach you for help 
in
 > case you also encountered this kind of problem before or have some 
insights
 > in this kind of issue.
 >
 >
 >
 > Here is the scenario. We have setup a deployment where we run 30
 > Mirrormaker instances on 30 different nodes. Each Mirrormaker instance is
 > configure with num.streams=1 thus only one consumer runs. The topics to
 > replicate is configure with 100 partitions and data is almost evenly
 > distributed across all partitions. After running a period of time, weird
 > things happened that some of the Mirrormaker instances seems to slow down
 > and consume at a relative slow speed from source Kafka cluster. The 
output
 > of tcptrack shows the consume rate of problematic instances dropped to
 > ~1MB/s, while the other healthy instances consume at a rate of  ~3MB/s. 
As
 > a result, the consumer lag for corresponding partitions are going high.
 >
 >
 >
 >
 > After triggering a tcpdump, we noticed the traffic pattern in tcp
 > connection of problematic Mirrmaker instances is very different from
 > others. Packets flowing in problematic tcp connections are relatively 
small
 > and seq and ack packets are basically coming in one after another. On the
 > other hand, the packets in healthy tcp connections are coming in a
 > different pattern, basically several seq packets comes with an ack 
packets.
 > Below screenshot shows the situation, and these two captures are got on 
the
 > same mirrormaker node.
 >
 >
 >
 > problematic connection.  ps. 10.kfk.kfk.kfk is kafka broker, 10.mm.mm.mm
 > is Mirrormaker node
 >
 > 
https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fimgur.com%2FZ3odjjT&data=02%7C01%7Czhaohxu%40ebay.com%7Ca8efe84f9feb47ecb5fd08d53b85d7ac%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636480368398154028&sdata=2DdGcjPWD7QI7lZ7v7QDN6I53P9tsSTMzEGdw6IywmU%3D&reserved=0
 >
 >
 > healthy connection
 >
 > 
https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fimgur.com%2Fw0A6qHT&data=02%7C01%7Czhaohxu%40ebay.com%7Ca8efe84f9feb47ecb5fd08d53b85d7ac%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636480368398154028&sdata=v52DmmY9LHN2%2F59Hb5Xo77JuLreOA3lfDyq8eHKmISQ%3D&reserved=0
 >
 >
 > If we stop the problematic Mirrormaker instance and when other instances
 > take over the lagged partitions, they can consume messages quickly and
 > catch up the lag soon. So the broker in source Kafaka cluster is supposed
 > to be good. But if Mirrormaker itself causes the issue, how can one tcp
 > connection is good but others are problematic since the connections are 
all
 > established in the same manner by Kafka library.
 >
 >
 >
 > Consumer configuration for Mirrormaker instance as below.
 >
 > auto.offset.reset=earliest
 >
 >
 > 
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
 >
 > heartbeat.interval.ms=1
 >
 > session.timeout.ms=12
 >
 > request.timeout.ms=15
 >
 > receive.buffer.bytes=1048576
 >
 > max.partition.fetch.bytes=2097152
 >
 > fetch.min.bytes=1048576
 >
 >
 >
 > Kafka version is 0.10.0.0 and we have Kafka and Mirrormaker run on Ubuntu
 > 14.04
 >
 >
 >
 > Any response is appreciated.
 >
 > Regards,
 >
 > Tao
 >
 





Re: Configuration: Retention and compaction

2017-12-03 Thread Jan Filipiak

Hi

the only retention time that applies for compacted topics is the 
delete.retention.ms
The duration that tombstones for deletes will be kept in the topic 
during compaction.


A very detail explaination on what is going on can be found here:

https://kafka.apache.org/documentation/#compaction

Hope this helps

Best Jan


On 03.12.2017 20:27, Dmitry Minkovsky wrote:

This is a pretty stupid question. Mostly likely I should verify these by
observation, but really I want to verify that my understanding of the
documentation is correct:

Suppose I have topic configurations like:

retention.ms=$time
cleanup.policy=compact


My questions are:

1. After $time, any offsets older than $time will be eligible for
compaction?
2. Regardless of $time, any offsets in the current segment will not be
compacted?


Thank you,
Dmitry





Re: Joins in Kafka Streams and partitioning of the topics

2017-11-30 Thread Jan Filipiak
There are some oddities in your topology that make make we wonder if 
they are the true drivers of your question.


https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java#L300
Feels like it should be a KTable to begin with for example otherwise it 
is not clear how big this is supposed to grow

https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java#L325
Same thing for policies. KGlobalTable might be chipped in later if you 
fat up from too many repartitions as some sort of

performance optimisation, but my opinions on it are not to high.


Hope that helps, just keep the questions coming, also check if you might 
want to join confluentcommunity on slack.
Could never imaging that something like a insurance can really be 
modelled as 4 streams ;)


Best Jan





On 30.11.2017 21:07, Artur Mrozowski wrote:

what if I start two instances of that application?  Does the state migrate
between the applications? Is it then I have to use a global table?

BR
Artur

On Thu, Nov 30, 2017 at 7:40 PM, Jan Filipiak 
wrote:


Hi,

Haven't checked your code. But from what you describe you should be fine.
Upgrading the version might help here and there but should still work with
0.10
I guess.

Best Jan



On 30.11.2017 19:16, Artur Mrozowski wrote:


Thank you Damian, it was very helpful.
I have implemented my solution in version 0.11.0.2 but there is one thing
I
still wonder.
So what I try to do is what is described in KIP 150. Since it didn't make
to the release for 1.0 I do it the old fashioned way.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+
Kafka-Streams+Cogroup
First
KTable table1 =
builder.stream("topic1").groupByKey().aggregate(initializer1,
aggregator1, aggValueSerde1, storeName1);


for all the four topics and then I join the results.
And here is the thing, the topics are partitioned and I don't used global
tables, nor keyed messages and it seems to work fine.

  From Confluents documentation one could get impression that when reading
from partitoned topics you need to use global tables. But is it really
necessary in this case?
And if not then why?

Thanks again
Artur

Here is the link to my implementation

https://github.com/afuyo/KStreamsDemo/blob/master/src/main/
java/kstream.demo/CustomerStreamPipelineHDI.java

On Wed, Nov 22, 2017 at 12:10 PM, Damian Guy 
wrote:

Hi Artur,

KafkaStreams 0.10.0.0 is quite old and a lot has changed and been fixed
since then. If possible i'd recommend upgrading to at least 0.11.0.2 or
1.0.
For joins you need to ensure that the topics have the same number of
partitions (which they do) and that they are keyed the same.

Thanks,
Damian

On Wed, 22 Nov 2017 at 11:02 Artur Mrozowski  wrote:

Hi,

I am joining 4 different topic with 4 partitions each using 0.10.0.0
version of Kafka Streams.  The joins are KTable to KTable. Is there
anything I should be aware of considering partitions or version of Kafka
Streams? In other words should I be expecting consistent results or do I
need to for example use Global tables.

I'd like to run that application on Kubernetes later on. Should I think


of


anything or do different instances of the same Kafka Streams application
take care of management of the state?

Grateful for any thoughts or a piece of advice

Best Regards
/Artur






Re: Joins in Kafka Streams and partitioning of the topics

2017-11-30 Thread Jan Filipiak

Hi,

Haven't checked your code. But from what you describe you should be fine.
Upgrading the version might help here and there but should still work 
with 0.10

I guess.

Best Jan


On 30.11.2017 19:16, Artur Mrozowski wrote:

Thank you Damian, it was very helpful.
I have implemented my solution in version 0.11.0.2 but there is one thing I
still wonder.
So what I try to do is what is described in KIP 150. Since it didn't make
to the release for 1.0 I do it the old fashioned way.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
First
KTable table1 =
builder.stream("topic1").groupByKey().aggregate(initializer1,
aggregator1, aggValueSerde1, storeName1);


for all the four topics and then I join the results.
And here is the thing, the topics are partitioned and I don't used global
tables, nor keyed messages and it seems to work fine.

 From Confluents documentation one could get impression that when reading
from partitoned topics you need to use global tables. But is it really
necessary in this case?
And if not then why?

Thanks again
Artur

Here is the link to my implementation

https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java

On Wed, Nov 22, 2017 at 12:10 PM, Damian Guy  wrote:


Hi Artur,

KafkaStreams 0.10.0.0 is quite old and a lot has changed and been fixed
since then. If possible i'd recommend upgrading to at least 0.11.0.2 or
1.0.
For joins you need to ensure that the topics have the same number of
partitions (which they do) and that they are keyed the same.

Thanks,
Damian

On Wed, 22 Nov 2017 at 11:02 Artur Mrozowski  wrote:


Hi,
I am joining 4 different topic with 4 partitions each using 0.10.0.0
version of Kafka Streams.  The joins are KTable to KTable. Is there
anything I should be aware of considering partitions or version of Kafka
Streams? In other words should I be expecting consistent results or do I
need to for example use Global tables.

I'd like to run that application on Kubernetes later on. Should I think

of

anything or do different instances of the same Kafka Streams application
take care of management of the state?

Grateful for any thoughts or a piece of advice

Best Regards
/Artur





Re: Plans to extend streams?

2017-11-30 Thread Jan Filipiak

hi,

I understand your point better now.

I think systems of that kind have been build plenty and I never liked 
their trade-offs.


Samza and Kafka-streams form a great alternative to what is out there in 
great numbers.


I am a big fan of how this is designed and think its really great. Maybe 
you should

give it a shot?

Just to get you interested:
With extreme detailed partition assignment and deploying stream jobs on 
broker instances
you can align all the topics so that you get basically the same kind of 
shuffle other system use.
attaching a little bit of "cruise-control" 
https://engineering.linkedin.com/blog/2017/08/open-sourcing-kafka-cruise-control
could also handle node failures. But usually this is not necessary. The 
hop across the broker is usually just to efficient

to have this kind of fuzz going on.

Hope this can convince you to try it out.


Best Jan


On 29.11.2017 20:15, Guozhang Wang wrote:

Hello Adrienne,

I think your suggested feature to to use not only Kafka as inter-process
communication but also configurable to use TCP directly, right?

There are a few people asking about this before, especially for not using
Kafka for repartitioning (think: shuffling in the batch world), but let
them go through TCP between processes. Though this is doable, I'd point out
that it may have many side-effects such as:

1) back pressure: Streams library do not worry about back pressure at all
since all communication channels are persistent (Kafka topics), using TCP
then you need to face the back pressure issue again.
2) exactly once semantics: the transactional messaging is leveraged by
Streams to achieve EOS, and extending TCP means that we need to add more
gears to handle TCP data loss / duplicates (e.g. other frameworks have been
using buffers with epoch boundaries to do that).
3) state snapshots: imagine if you are shutting down your app, we then need
to make sure all in-flight messages with TCP are drained because otherwise
we are not certain if the committed offsets are valid or not.



Guozhang


On Wed, Nov 29, 2017 at 8:26 AM, Adrienne Kole 
wrote:


Hi,

You misunderstood the focus of the post perhaps or I could not explain
properly. I am not claiming the streams is limited to single node.
Although the whole topology instance can be limited to a single node (each
node run all topology), this is sth else.
Also, I think that "moving 100s of GB data per day" claim is orthogonal
and as this is not big/fast/ enough to reason.

The thing is that, for some use-cases streams-kafka-streams connection can
be a bottleneck.  Yes, if I have 40GB/s or infiniband network bandwidth
this might not be an issue.

Consider a simple topology with operators A>B->C. (B forces to
re-partition)
  Streams nodes are s1(A), s2 (B,C) and kafka resides on cluster k, which
might be in different network switch.
So, rather than transferring data k->s1->s2, we make a round trip
k->s1->k->s2. If we know that s1 and s2 are in the same network and data
transfer is fast between two, we should not go through another intermediate
layer.


Thanks.



On Wed, Nov 29, 2017 at 4:52 PM, Jan Filipiak 
wrote:


Hey,

you making some wrong assumptions here.
Kafka Streams is in no way single threaded or
limited to one physical instance.
Having connectivity issues to your brokers is IMO
a problem with the deployment and not at all
with how kafka streams is designed and works.

Kafka Streams moves hundreds of GB per day for us.

Hope this helps.

Best Jan



On 29.11.2017 15:10, Adrienne Kole wrote:


Hi,

The purpose of this email is to get overall intuition for the future
plans
of streams library.

The main question is that, will it be a single threaded application in

the

long run and serve microservices use-cases, or are there any plans to
extend it to multi-node execution framework with less kafka dependency.

Currently, each streams node 'talks' with kafka cluster and they can
indirectly talk with each other again through kafka. However, especially
if
kafka is not in the same network with streams nodes (actually this can
happen if they are in the same network as well) this will cause high
network overhead and inefficiency.

One solution for this (bypassing network overhead) is to deploy streams
node on kafka cluster to ensure the data locality. However, this is not
recommended as the library and kafka can affect each other's performance
and  streams does not necessarily have to know the internal data
partitioning of kafka.

Another solution would be extending streams library to have a common
runtime. IMO, preserving the current selling points of streams (like
dynamic scale in/out) with this kind of extensions can be very good
improvement.

So my question is that, will streams in the long/short run, will extend
its
use-cases to massive and efficient stream processing (and compete with
spark) or stay and strengthen its current position?

Cheers,
Adrienne









Re: Plans to extend streams?

2017-11-29 Thread Jan Filipiak

Hey,

you making some wrong assumptions here.
Kafka Streams is in no way single threaded or
limited to one physical instance.
Having connectivity issues to your brokers is IMO
a problem with the deployment and not at all
with how kafka streams is designed and works.

Kafka Streams moves hundreds of GB per day for us.

Hope this helps.

Best Jan


On 29.11.2017 15:10, Adrienne Kole wrote:

Hi,

The purpose of this email is to get overall intuition for the future  plans
of streams library.

The main question is that, will it be a single threaded application in the
long run and serve microservices use-cases, or are there any plans to
extend it to multi-node execution framework with less kafka dependency.

Currently, each streams node 'talks' with kafka cluster and they can
indirectly talk with each other again through kafka. However, especially if
kafka is not in the same network with streams nodes (actually this can
happen if they are in the same network as well) this will cause high
network overhead and inefficiency.

One solution for this (bypassing network overhead) is to deploy streams
node on kafka cluster to ensure the data locality. However, this is not
recommended as the library and kafka can affect each other's performance
and  streams does not necessarily have to know the internal data
partitioning of kafka.

Another solution would be extending streams library to have a common
runtime. IMO, preserving the current selling points of streams (like
dynamic scale in/out) with this kind of extensions can be very good
improvement.

So my question is that, will streams in the long/short run, will extend its
use-cases to massive and efficient stream processing (and compete with
spark) or stay and strengthen its current position?

Cheers,
Adrienne





Re: No. of Kafk Instances in Single machine

2017-11-06 Thread Jan Filipiak

Hi,

I probably would recommend you to go for 1 instance. You can bump a few 
thread configs to match your hardware better.


Best Jan

On 06.11.2017 12:23, chidigam . wrote:

Hi All,
Let say, I have big machine, which having 120GB RAM,  with lot of cores,
and very high disk capacity.

How many no. of kafka instances are recommended? Is there any general
principle I can apply, to calculate optimal no.

Any help in this regards is highly appreciated.

Regards
Bhanu





Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-10-26 Thread Jan Filipiak

Thanks for the remarks. hope I didn't miss any.
Not even sure if it makes sense to introduce A and B or just stick with 
"this ktable", "other ktable"


Thank you
Jan


On 27.10.2017 06:58, Ted Yu wrote:

Do you mind addressing my previous comments ?

http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+DISCUSS+KIP+213+Support+non+key+joining+in+KTable

On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak 
wrote:


Hello everyone,

this is the new discussion thread after the ID-clash.

Best
Jan

__


Hello Kafka-users,

I want to continue with the development of KAFKA-3705, which allows the
Streams DSL to perform KTableKTable-Joins when the KTables have a
one-to-many relationship.
To make sure we cover the requirements of as many users as possible and
have a good solution afterwards I invite everyone to read through the KIP I
put together and discuss it here in this Thread.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
Support+non-key+joining+in+KTable
https://issues.apache.org/jira/browse/KAFKA-3705
https://github.com/apache/kafka/pull/3720

I think a public discussion and vote on a solution is exactly what is
needed to bring this feauture into kafka-streams. I am looking forward to
everyones opinion!

Please keep the discussion on the mailing list rather than commenting on
the wiki (wiki discussions get unwieldy fast).

Best
Jan







[DISCUSS] KIP-213 Support non-key joining in KTable

2017-10-25 Thread Jan Filipiak

Hello Kafka-users,

I want to continue with the development of KAFKA-3705, which allows the 
Streams DSL to perform KTableKTable-Joins when the KTables have a 
one-to-many relationship.
To make sure we cover the requirements of as many users as possible and 
have a good solution afterwards I invite everyone to read through the 
KIP I put together and

discuss it here in this Thread.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
https://issues.apache.org/jira/browse/KAFKA-3705
https://github.com/apache/kafka/pull/3720

I think a public discussion and vote on a solution is exactly what is 
needed to bring this feauture into kafka-streams. I am looking forward 
to everyones opinion!


Please keep the discussion on the mailing list rather than commenting on 
the wiki (wiki discussions get unwieldy fast).


Best
Jan








Re: Log Compaction Not Picking up Topic

2017-10-25 Thread Jan Filipiak

Hi,

unfortunatly there is nothing trivial you could do here.
Without upgrading your kafkas you can only bounce the partition back and 
forth

between brokers so they compact while its still small.

With upgrading you could also just cherrypick this very commit or put a 
logstatement to verify.


Given the Logsizes your dealing with, I am very confident that this is 
your issue.


Best Jan


On 25.10.2017 12:21, Elmar Weber wrote:

Hi,

On 10/25/2017 12:15 PM, Xin Li wrote:
> I think that is a bug, and  should be fixed in this task 
https://issues.apache.org/jira/browse/KAFKA-6030.
> We experience that in our kafka cluster, we just check out the 
11.0.2 version, build it ourselves.


thanks for the hint, as it looks like a calculation issue, would it be 
possible to verify this by manually changing the clean ratio or some 
other settings?


Best,
Elmar





Re: Kafka Connect Sink Connector for multiple JDBC sinks

2017-09-16 Thread Jan Filipiak

Hi,

entirely depends on how you want to serialize. You should be able to get 
everything running on Windows anyhow. Nothing expect the broker is 
really extensively using OS support for operating.


To answer your initial question: You would simply start multiple sinks 
and give each sink a different connect string. That should do what you 
want instantly


Best Jan

On 16.09.2017 22:51, M. Manna wrote:

Yes I have, I do need to build and run Schema Registry as a pre-requisite
isn't that correct? because the QuickStart seems to start AVRO - without
AVRO you need your own implementation of transformer/serdes etc.

I am only asking since my deployment platform is Windows Server 2012 - and
Confluent pkg is meant to be run on Linux. I guess there is a lot of manual
conversion I need to do here?

On 16 September 2017 at 21:43, Ted Yu  wrote:


Have you looked at https://github.com/confluentinc/kafka-connect-jdbc ?

On Sat, Sep 16, 2017 at 1:39 PM, M. Manna  wrote:


Sure. But all these are not available via Kafka open source (requires
manual coding), correct? Only Confluence seems to provide some
off-the-shelf connector but Confluent isn't compatible on Windows (yet),
also correct?



On 13 September 2017 at 18:11, Sreejith S  wrote:


This is possible. Once you have records in your put method, its up your
logic how you are redirecting it to multiple jdbc connections for
insertion.

In my use case i have implemented many to many sources and sinks.

Regards,
Srijith

On 13-Sep-2017 10:14 pm, "M. Manna"  wrote:

Hi,

I need a little help/suggestion if possible. Does anyone know if it's
possible in Kafka to develop a connector that can sink for multiple

JDBC

urls for the same topic (i.e. table) ?

The examples I can see on Confluent talks about one JDBC url

(one-to-one

sink). Would it be possible to achieve a one-to-many ?

What I am trying to do is the following:

1) Write to a topic
2) Sink it to multiple DBs (they all will have the same table).

Is this doable/correct way for Connect API?

Kindest Regards,





Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-11 Thread Jan Filipiak
ser anyways to keep his code sane. And it 
will most
likely be broken up in the interesting parts where IQ would be usefull. 
The math of
how many people are affected by this is therefore not important. 
Additionally
the comparission doesnt make sense as the X-group can still go with a 
fluent

interface only the Y-people need to break their fluent interface



Guozhang


On Fri, Aug 4, 2017 at 6:18 AM, Jan Filipiak 
wrote:


Hi Guozhang,

  thank you very much for the reply. It explained a lot more of your
reasoning to me
once again!

I have to disagree with you on the first point. As you mentioned the Join
Case.
A Join is usually a "logically" materialized table and its
KTableValueGetterSupplier
is to be used when one wants todo a lookup. But this is not at all what is
currently
Happening. The join merge processor currently maintains its own new
statestore
when join is invoked with Storename or supplier.

This describes the Issue I want to address perfectly. A Joined-Table
doesn't become
querieable because it is a JOINEDtable but because it is a joinedTABLE.
the emphasis here
is that we put the store logic with the join and not the table. It is part
of the join() method invocation and not the KTable Interface. This
abstraction is wrong.

This will always show its ugly face. Lets check your example:

stream.groupByKey(..).aggregate(.., Materializedas("store1"))//
this resulted KTable is materialized in order to complete the aggregation
operation
   .filter(Materialized.as("store2"))
// this restuled KTable is not materialized but its
GetterSupplier is implemented to get values from "store1"

Currently this is only half true. For IQ a store is used that is maintained
by the KTableFilterProcessor, for downstream gets like joins the
ValueGetterSupplier is used
and indeed uses store1.

With the String overload (that you picked here on purpose I guess) it
works easier
as you can logically map those. But with the StateStoreSupplier it
wouldn't.
you could not optimize this away as the user is expecting puts and gets to
be called
on what he supplied.

table1.filter(() -> true, InMemoryStore).filter(()->true,SQlLiteStore)

There is no way to optimize these away.
The same argument with the join holds for filter. Its not querrieable
because it got filtered
it is querrieable because its a KTable. That's where the emphasis needs to
go.

The second point was new to me. So I had to think about this in more
detail.
For me the breaking of the flow comes in very natural.

One Stream app I put most of my heart in has the these key metrics:
It has:
8   input topics.
3   1:n Joins
6   Group bys
2   built in Joins
2   built in left joins
some filters and mappers.

this is spanning 390 lines, counting java imports and some more stuff.

The whole topology forms a tree in wich the input topics usually get
joined and then collected to maps
and then joined again and collected to maps again. until they get send to
1 final output topic for consumption in our application servers.

I would argue it is impossible to express this topology as a chain of
calls. What happened is that
usually each join + groupBy tuple became its method taking in the builder
and return the Table
expressing the result of the sub topology. All Ktables that meet each
other with the same key in the
process get joined (most of that happening on the top level). This leads
to breaking in the fluent interface
quite naturally. especially if you have 2 KTables expressing
sub-topologies joined together. One subtopology had to go into the method
call which is unreasonable IMHO.

Even inside these methods we broke the chains. The variable names we used
give intermediate KTables really helped in making the semantics clear. They
are much like CTE's in hive or the required name in Mysql Subquerries. They
help to mark milestones inside the topology.

I would argue that for big topologies. (I haven't seen others but I think
its big) these milestones would
be the most important ones for IQ aswell. So i would argue breaking the
chains is not really a problem in
reality and it can help in many cases. As I laid out, we broke our chained
calls intuitively and it helped
other developers debugging the logic a lot. Even without detailed streams
understanding.

If one really do not want to stop the flow. I could argue that one could
either do something like this

KTable joinresult;
KTable t1 = b.table("laa");
KTable t2 = b.table("luu");
(joinresult = t1.join(t2, (value1, value2) -> value1 + value2))
.filter((key, value) -> false);

or write a little snitch like that

KTable rememberTableandContinue(KTable
t){
 joinresult = t;
 return t;
}

for usuage as such

rememberTableandContinue(t1.join(t2, (value1, value2) -> value1 + value2))
 .filter((key, value) -> false);

These suggestions might not looks so pr

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-04 Thread Jan Filipiak
ving them separatly will increase the readability of topologies by 
a lot IMO.

For these quick example Topologies that we have floating around in all places:
I am pretty sure one can go unbroken on them and usually the last table will be 
the one that
is needed for IQ then.


Thanks again. The second point really got me thinking, as your perspective on 
the importance
of "not break the fluent interface" was not clear to me. I hope I managed to 
line out why I
think it shouldn't have such a big weight in the discussion.

PS.: check out Hive CTE, everyone loves them and our Analytic team is crazy for 
them
because you can name them and that brings clarity. and you get rid of the 
nesting and can
split everything into logical chunks of SQL. KTable variables are the CTE of 
kafka streams.
One can probably sell this to people :)

Best Jan
Enjoyed your feedback! hope mine makes sense




On 03.08.2017 00:10, Guozhang Wang wrote:

Hello Jan,

Thanks for your proposal. As Bill mentioned the main difference is that we
extract the user-customizable materialization logic out of the topology
building DSL workflow. And the main motivations are in two folds:

1) efficiency wise, it allows some KTables to not be materialized if
unnecessary, saving one state store instance and changelog topic.

2) programming wise, it looks nicer to separate the topology construction
code from the KTable materialization for IQ uses code.


Here are my thoughts regarding these two points:

Regarding 1), I think with whichever the public APIs (either Damian's
proposal or yours), we can always apply the internal optimization to not
physically materialize the KTable. You can take a look at the internal
interface of "KTableValueGetterSupplier", which is used exactly for this
purposes such that a get call on a "logically" materialized KTable can be
traced back to its parent KTables that are physically materialized in a
state store. So following proposed APIs, for example:


stream.groupByKey(..).aggregate(.., Materializedas("store1"))//
this resulted KTable is materialized in order to complete the aggregation
operation
 .filter(Materialized.as("store2"))
// this restuled KTable is not materialized but its
GetterSupplier is implemented to get values from "store1"


Or

table1 = stream.groupByKey(..).aggregate(..);
table2 = table1.filter();

tabel1.queryHandle("store1");   // this resulted KTable is materialized
in order to complete the aggregation operation
tabel1.queryHandle("store2")// this restuled KTable is not
materialized but its GetterSupplier is implemented to get values from
"store1"



When user query a value for "store2" which is not actually materialized
into a state store, the GetterSupplier will be triggered to in turn query
the store for "store1", and then apply the filter operator on-the-fly to
return the value. So the bottom line is, we can achieve the same efficiency
optimization with either of the public APIs.


Regarding 2), I actually have proposed a similar API to yours earlier in
this discussion thread:

---

// specifying the topology, should be concise and conveniently
concatenated, no specs of materialization at all

KStream stream1 = builder.stream();
KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator,
sessionMerger, sessionWindows);  // do not allow to pass-in a state store
supplier here any more

// additional code to the topology above, could be more prescriptive
than descriptive
// only advanced users would want to code in both parts above; while other
users would only code the topology as above.

table1.materialize("queryableStoreName"); // or..
table1.materialize("queryableStoreName").enableCaching().enableLogging();
// or..
table1.materialize(stateStoreSupplier); // we check type (key-value types,
windowed or not etc) at starting time and add the metrics / logging /
caching / windowing wrapper on top of the store, or..
table1.materialize(stateStoreSupplier).enableCaching().enableLogging(); //
etc..

---

But one caveat of that, as illustrated above, is that you need to have
separate object of the KTable in order to call either "queryHandle" or
"materialize" (whatever the function name is) for the specifications of
materialization options. This can break the concatenation of the topology
construction part of the code, that you cannot simply add one operator
directly after another. So I think this is a trade-off we have to make and
the current approach looks better in this regard.



Guozhang




On Wed, Aug 2, 2017 at 2:07 PM, Jan Filipiak
wrote:


Hi Bill,

totally! So in the original discussion it was mentioned that the overloads
are nasty when implementing new features. So we wanted to get r

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-02 Thread Jan Filipiak

Hi Bill,

totally! So in the original discussion it was mentioned that the 
overloads are nasty when implementing new features. So we wanted to get 
rid of them. But what I felt was that the
copy & pasted code in the KTableProcessors for maintaining IQ stores was 
as big as a hurdle as the overloads.


With this proposal I try to shift things into the direction of getting 
IQ for free if
KTableValueGetterSupplier is properly implemented (like getting join for 
free then). Instead of having the code for maintaining IQ stores all the 
places. I realized I can do that while getting rid of the overloads, 
that makes me feel my proposal is superior.


Further I try to optimize by using as few stores as possible to give the 
user what he needs. That should save all sorts of resources while 
allowing faster rebalances.


The target ultimately is to only have KTableSource and the Aggregators 
maintain a Store and provide a ValueGetterSupplier.


Does this makes sense to you?

Best Jan

On 02.08.2017 18:09, Bill Bejeck wrote:

Hi Jan,

Thanks for the effort in putting your thoughts down on paper.

Comparing what I see from your proposal and what is presented in 
KIP-182, one of the main differences is the exclusion of 
an`Materialized`  instance in the `KTable` methods.


Can you go into more detail why this is so and the specific problems 
is avoids and or solves with this approach?


Thanks!
Bill

On Wed, Aug 2, 2017 at 4:19 AM, Damian Guy <mailto:damian@gmail.com>> wrote:


Hi Jan,

Thanks for taking the time to put this together, appreciated. For the
benefit of others would you mind explaining a bit about your
motivation?

Cheers,
Damian

On Wed, 2 Aug 2017 at 01:40 Jan Filipiak mailto:jan.filip...@trivago.com>> wrote:

> Hi all,
>
> after some further discussions, the best thing to show my Idea
of how it
> should evolve would be a bigger mock/interface description.
> The goal is to reduce the store maintaining processors to only the
> Aggregators + and KTableSource. While having KTableSource optionally
> materialized.
>
> Introducing KTable:copy() will allow users to maintain state
twice if
> they really want to. KStream::join*() wasn't touched. I never
personally
> used that so I didn't feel
> comfortable enough touching it. Currently still making up my
mind. None
> of the suggestions made it querieable so far. Gouzhangs
'Buffered' idea
> seems ideal here.
>
> please have a look. Looking forward for your opinions.
>
> Best Jan
>
>
>
> On 21.06.2017 17 :24, Eno Thereska wrote:
> > (cc’ing user-list too)
> >
> > Given that we already have StateStoreSuppliers that are
configurable
> using the fluent-like API, probably it’s worth discussing the other
> examples with joins and serdes first since those have many
overloads and
> are in need of some TLC.
> >
> > So following your example, I guess you’d have something like:
> > .join()
> > .withKeySerdes(…)
> > .withValueSerdes(…)
> > .withJoinType(“outer”)
> >
> > etc?
> >
> > I like the approach since it still remains declarative and
it’d reduce
> the number of overloads by quite a bit.
> >
> > Eno
> >
> >> On Jun 21, 2017, at 3:37 PM, Damian Guy mailto:damian@gmail.com>> wrote:
> >>
> >> Hi,
> >>
> >> I'd like to get a discussion going around some of the API
choices we've
> >> made in the DLS. In particular those that relate to stateful
operations
> >> (though this could expand).
> >> As it stands we lean heavily on overloaded methods in the
API, i.e,
> there
> >> are 9 overloads for KGroupedStream.count(..)! It is becoming
noisy and i
> >> feel it is only going to get worse as we add more optional
params. In
> >> particular we've had some requests to be able to turn caching
off, or
> >> change log configs,  on a per operator basis (note this can
be done now
> if
> >> you pass in a StateStoreSupplier, but this can be a bit
cumbersome).
> >>
> >> So this is a bit of an open question. How can we change the DSL
> overloads
> >> so that it flows, is simple to use and understand, and is easily
> extended
> >> in the future?
> >>
> >> One option would be to use a fluent API approach for
providing the
> optional
> >> params, so something like this

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-01 Thread Jan Filipiak

Hi all,

after some further discussions, the best thing to show my Idea of how it 
should evolve would be a bigger mock/interface description.
The goal is to reduce the store maintaining processors to only the 
Aggregators + and KTableSource. While having KTableSource optionally 
materialized.


Introducing KTable:copy() will allow users to maintain state twice if 
they really want to. KStream::join*() wasn't touched. I never personally 
used that so I didn't feel
comfortable enough touching it. Currently still making up my mind. None 
of the suggestions made it querieable so far. Gouzhangs 'Buffered' idea 
seems ideal here.


please have a look. Looking forward for your opinions.

Best Jan



On 21.06.2017 17:24, Eno Thereska wrote:

(cc’ing user-list too)

Given that we already have StateStoreSuppliers that are configurable using the 
fluent-like API, probably it’s worth discussing the other examples with joins 
and serdes first since those have many overloads and are in need of some TLC.

So following your example, I guess you’d have something like:
.join()
.withKeySerdes(…)
.withValueSerdes(…)
.withJoinType(“outer”)

etc?

I like the approach since it still remains declarative and it’d reduce the 
number of overloads by quite a bit.

Eno


On Jun 21, 2017, at 3:37 PM, Damian Guy  wrote:

Hi,

I'd like to get a discussion going around some of the API choices we've
made in the DLS. In particular those that relate to stateful operations
(though this could expand).
As it stands we lean heavily on overloaded methods in the API, i.e, there
are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
feel it is only going to get worse as we add more optional params. In
particular we've had some requests to be able to turn caching off, or
change log configs,  on a per operator basis (note this can be done now if
you pass in a StateStoreSupplier, but this can be a bit cumbersome).

So this is a bit of an open question. How can we change the DSL overloads
so that it flows, is simple to use and understand, and is easily extended
in the future?

One option would be to use a fluent API approach for providing the optional
params, so something like this:

groupedStream.count()
   .withStoreName("name")
   .withCachingEnabled(false)
   .withLoggingEnabled(config)
   .table()



Another option would be to provide a Builder to the count method, so it
would look something like this:
groupedStream.count(new
CountBuilder("storeName").withCachingEnabled(false).build())

Another option is to say: Hey we don't need this, what are you on about!

The above has focussed on state store related overloads, but the same ideas
could  be applied to joins etc, where we presently have many join methods
and many overloads.

Anyway, i look forward to hearing your opinions.

Thanks,
Damian



@InterfaceStability.Evolving
public interface KTable {

KTable filter(final Predicate predicate);
KTable filterNot(final Predicate predicate);
 KTable mapValues(final ValueMapper 
mapper);

KStream toStream();

KTable copy(); Inserts a new KTableSource
KTable copy(Materialized m); inserts a new KTableSource using 
toStream() as parent


   //I see why, Id rather have users using to+table
KTable through(final String topic);
KTable through(Produced p,
 final String topic);

void to(final String topic);
void to(final Produced
final String topic);

 KGroupedTable groupBy(final KeyValueMapper> selector);
 KGroupedTable groupBy(final KeyValueMapper> selector, Serialized s);

 KTable join(final KTable other,
final ValueJoiner joiner);

 KTable leftJoin(final KTable other,
final ValueJoiner joiner);

 KTable outerJoin(final KTable other,
 final ValueJoiner joiner);

UninitializedQueryHandle QueryHandle(); // causes enable sending old 
value / materialize



//Currently marked deprecated, easily reproduced by map or similiar
void writeAsText(final String filePath);
void writeAsText(final String filePath,
 final String streamName);
void  writeAsText(final String filePath,
  final Serde keySerde,
  final Serde valSerde);
void writeAsText(final String filePath,
 final String streamName,
 final Serde keySerde,
 final Serde valSerde);
void foreach(final ForeachAction action);
}


public interface UninitializedQueryHandle{

QueryHandle initialize(KafkaStreams ks);
}

public interface QueryHandle {

V get(K k);

}

public interface Produced{

Produced static with();

Produced serializer(Serialized s);

Produced partitioner(StreamPartitionier sp);

//sneaky new feature. skipable
Produced internalTopic();
//Hint ignored when cog

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-24 Thread Jan Filipiak

Hi Damian,

thanks for taking the time. I think you read my points individually but 
you seem to not understand the bigger picture I am trying to paint.


From the three problems I mentioned - and that you agreed on to be 
problems -  you are only trying to address the first.


What I am trying to tell you is that if you focus on the later two the 
first one comes for free. On the other hand if you focus on the first
and please allow me to call it the easy part. All you going to archive 
is to break user land and sugar coat the real problems.


This takes away overloads, but still leaves it a mess to implement new 
features. I am currently trying to prep a patch for Kafka-3705 and
I do not understand why I should deal with Interactive Queries what so 
ever. My Output table has a proper ValueGetterSupplier.

That should be it!

I hope I made clear that to improve here quite some hard work has been 
done and that it would be rewariding and that just sugar coating everything

is one of the worst steps we could take from where we are at the moment.

Looking at Kafka-5581 that you mentioned. None of the points are really 
related to what I am saying really. Each of these points is tricky and

requires some carefull thinking but might work out.

Further Looking at you comment that refers to KIP vs. DISCUSS. I don't 
know what I should understand from that.


Regarding your comment mentioning that getQueryHandle() wouldn't work. 
Its the same thing as giving the user a queryable string.
It works the same way with the only difference that we have a wrapper 
object that gives the user what he wants instantly! Instead of giving 
him a String
to get a Store, we just give him a store, plus we don't hand out some 
inflexible native types that we later on don't have control over.

The whole logic about partitioners and what else does not change.

Hope this makes my points more clear.

Best Jan


On 19.07.2017 12:03, Damian Guy wrote:

Hi Jan,

Thanks for your input. Comments inline

On Tue, 18 Jul 2017 at 15:21 Jan Filipiak  wrote:


Hi,


1. To many overloads:
Currently, whenever a KTable is the result of an operation it gets and
override with stateStoreName, and StatestoreSupplier in case people want
to query that.
This is what produces 2/3rd of the overloaded methods right now (not
counting methods returning KStream)



As you state further down we are trying to address this.



2. Code copy and pasting.
Almost all KTableProcessorSuppliers have the same block of (if(name !=
null) store.put(k,v))



Yes, i agree. That is related to the KTable queryable store etc, and can
easily be addressed, but isn't necessarily part of this as it doesn't need
to be a public interface change, i.e., we can clean that up in the
background.



3. Runtime inefficiencies.
Each querable table almost instantly causes a another store beeing
required. Storing equivalent data of upstream KTables.


Agreed. Again, this is not a public interface change. We don't need another
store, i.e., we can just use a "View" on the existing store, which is
basically just using the KTableValueGetter to apply the map or filter
operation to the original store. We also have this jira
https://issues.apache.org/jira/browse/KAFKA-5581 to look into optimizing
when we do and don't need to add additional changelogs.



So I really see us tackeling only the first part currently. Wich in my
opinion is to short-sighted to settle on an Public API.


We are not settling on the public API. We do, however need to do KIPs for
public API discussions. For internal changes we don't necessarily need to
have a public discussion about it.



This is why I want to tackle our approach to IQ-first, as it seems to me
to be the most disruptive thing. And the cause of most problems.

The Plan:

Table from topic, kstream (don't even like this one, but probaly needed
for some kind of enhanced flexibility) or aggregations would be the only
KTables that would get associated with a statestore (their processors).
For these operations one can have the "statestoresupplier" overload but
also not the "querablestatestore" overload. From this point on KTables
abstraction would be considered restored.
All the overloads of join and through with respect to IQ would go away.
"through" would go completely maybe the benefit added is. The method I
would add is for a table to get a Queryhandle.
This query handle will underneath remember its tables processor name. To
access the data form IQ we would not rely on the "per processor
statestore" but go the usual path through ValueGetterSupplier.
*Note:* We do not necessarily have a Serde for V, especially after
mapValues. also not for any intermediate Data types. It would be each
KTableProccesors job to provide a serialized version of upstream Datatypes.
KTableKTabkeJoinwould need to bring a JoinInputSerializer that
would serialize both upstream values for trans

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-18 Thread Jan Filipiak
ndowed store or key-value store is 
needed for this specific operator in the DSL, what serdes are needed 
for materialize the store, in order to create a StateStoreSupplier 
with caching / logging disabled, and then pass into the DSL.


2) Similarly, if a user just want to set different topic configs for 
the changelog topic, she still need to specify the whole 
StateStoreSupplier into the operator.


3) If a user want to use a different store engine (e.g. MyStore than 
RocksDBStore) underneath but do not care about the default settings 
for logging, caching, etc, he STILL needs to pass in the whole 
StateStoreSupplier into the operator.


Note that all the above scenarios are for advanced users who do want 
to override these settings, for users who are just OK with the default 
settings they should be not exposed with such APIs at all, like you 
said, "I do not be exposed with any of such implementation details", 
if you do not care.


-

We have been talking about the configs v.s. code for such settings, 
since we have been using configs for "global" default configs; but the 
arguments against using configs for such per-operator / per-store 
settings as well is that it will simply make configs hard to manage / 
hard to wire with tools. Personally speaking, I'm not a big fan of 
using configs for per-entity overrides and that is mainly from my 
experience with Samza:Samza inherits exactly the same approach for 
per-stream / per-source configs:


http://samza.apache.org/learn/documentation/0.13/jobs/configuration-table.html 
<http://samza.apache.org/learn/documentation/0.13/jobs/configuration-table.html> ([system-name][stream-id] 
etc are all place-holders)


The main issues were 1) users making config changes need to deploy 
this to all the instances, I think for Streams it would be even worse 
as we need to make a config file on each of the running instance, and 
whenever there is a change we need to make sure they are propagated to 
all of them, 2) whenever users make some code changes, e.g. to add a 
new stream / system, they need to remember to set the corresponding 
changes in the config files as well and they kept forgetting about it, 
the lesson learned there was that it is always better to change one 
place (code change) than two (code change + config file change).


Again, this is not saying we have vetoed this option, and if people 
have good reasons for this let's discuss them here.


-

So the current proposals are mainly around keeping configs for the 
global default settings, while still allowing users to override 
per-operator / per-store settings in the code, while also keeping in 
mind to not forced users to think about such implementation details if 
they are fine with whatever the default settings. For example:


As a normal user it is sufficient to specify an aggregation as

```
table4.join(table5, joiner).table();
```

in which she can still just focus on the computational logic with all 
implementation details abstracted away; only if the user are capable 
enough with the implementation details (e.g. how is the joining tables 
be materialized into state stores, etc) and want to specify her own 
settings (e.g. I want to swap in my own state store engine, or I want 
to disable caching for dedup, or use a different serde etc) she can 
"explore" them with the DSL again:


```
table4.join(table5, joiner).table(Materialized.as("store1")); // use a 
custom store name for interactive query
table4.join(table5, joiner).table(Materialized.as(MyStoreSupplier)); 
// use a custom store engine
table4.join(table5, 
joiner).table(Materialized.as("store1").withLoggingEnabled(configs)); 
// use a custom store changelog topic configs

// ... more
```

Hope it helps.


Guozhang


On Fri, Jul 7, 2017 at 3:42 PM, Jan Filipiak <mailto:jan.filip...@trivago.com>> wrote:


It makes me want to cry.

why on earth is the DSL going to expose all its implementation
details now?
especially being materialized or not.

If we want to take usefull steps in that direction maybe we are
looking for a way to let the user switch back and forth between
PAPI and DSL?

A change as the proposed would not eliminate any of my pain points
while still being a heck of work migrating towards to.

Since I am only following this from the point where Eno CC'ed it
into the users list:

Can someone please rephrase for me what problem this is trying to
solve? I don't mean to be rude but It uses a problematic feature
"StateStoreSuppliers in DSL" to justify making it even worse. This
helps us nowhere in making the configs more flexible, its just
syntactic sugar.

A low effort shoot like: lets add a properties to operations that
would otherwise become overloaded to heavy? Or pull the configs by
some naming schema
form the overall properties. A

Re: Where to run kafka-consumer-groups.sh from?

2017-07-11 Thread Jan Filipiak

Hi,

very likely due to timing. What problem is it causing you exactly that 
you want to work around?

These differences shouldn't concern you to much I guess.

We use the tool across continents and don't worry about it to much. 
Offset Commit interval makes everything blury anyways. If you can 
specify your pain more precisely maybe we can work around it.


Best Jan

On 10.07.2017 10:31, Dmitriy Vsekhvalnov wrote:

Guys, let me up this one again. Still looking for comments about
kafka-consumer-groups.sh
tool.

Thank you.

On Fri, Jul 7, 2017 at 3:14 PM, Dmitriy Vsekhvalnov 
wrote:


I've tried 3 brokers on command line, like that:

/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server
broker:9092,broker_2:9092,broker_3:9092 --new-consumer --group
logging-svc --describe

it doesn't make any difference, still x10 times difference in figures when
running on broker host vs. remote

Here is snippet from console output (are you looking something specific in
it? it looks normal a far as i can say):


TOPICPARTITION  CURRENT-OFFSET  LOG-END-OFFSETLAG
 CONSUMER-ID

test.topic  54 4304430935
  consumer-26-21f5050c-a43c-4254-bfcf-42e17dbdb651

test.topic  40 4426443610
 consumer-21-24f3ebca-004f-4aac-a348-638c9c6a02f0

test.topic  59 4414442063
  consumer-27-ed34f1b3-1be9-422b-bb07-e3c9913195c7

test.topic  42 4389440376
 consumer-22-75c2fc0a-5d5c-472d-b27e-e873030f82b6

test.topic  27 4416442224
  consumer-18-3be20568-8dd3-4679-a008-0ca64d31083c




On Fri, Jul 7, 2017 at 2:52 PM, M. Manna  wrote:


kafka-consumer-groups.sh --bootstrap-server broker:9092 --new-consumer
--group service-group --describe

how many brokers do you have in the cluster? if you have more than one,
list them all using a comma csv with --bootstrap-server.

Also, could you paste some results from the console printout?

On 7 July 2017 at 12:47, Dmitriy Vsekhvalnov 
wrote:


Hi all,

question about lag checking. We've tried to periodically sample consumer
lag with:

kafka-consumer-groups.sh --bootstrap-server broker:9092 --new-consumer
--group service-group --describe

it's all fine, but depending on host  we run it from it gives different
results.

E.g:

   - when running from one of the broker hosts itself we getting close

to 0

figures.

   - when running from remote host, we getting 30-60 in average (i

suspect

there are multiple remote calls to broker involved, so difference due to
timing).


My question is what is correct way to use it? From broker host itself?







Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-09 Thread Jan Filipiak
 is inline with my comment above.



also providing Serdes by config is neat. wouldn't even need to go into
the code then would also save a ton. (We have the defaults one in conf
why not override the specific ones?)

I am not sure, if Serdes are really a config? I mean, the data types are
hard coded into the code, so it does make sense to specify the Serdes
accordingly. I am also not sure how we would map Serdes from the config
to the corresponding operator?
true! maybe not an ideal case where configs help with overloading. I 
guess people are either using the global untyped one or a typed one for 
all steps.
So statestore is probably a better case. Its going to be referenced by a 
name always anyways so one could use this name to provide additional 
configs to the Statestore.

Probably also defining a factory used to build it.

Similarly a join has some sort of name, currently its 3 names, wich 
would need unifying to some degree, but then also the joins could be 
addressed with configs.
But Joins don't seem to have the to heaver overloading problem (Only 
store related :D).  But to be honest I can't judge the usefulness of 
outer and left. Not a pattern
that I came across yet for us its always inner. Maybe materialized but 
not sending old values is that what it does? Sorry can't wrap my head 
round that just now

heading towards 3am.

The example I provided was

streams.$applicationid.stores.$storename.inmemory = false
streams.$applicationid.stores.$storename.cachesize = 40k

for the configs. The Query Handle thing make sense hopefully.

Best Jan



-Matthias


On 7/8/17 2:23 AM, Jan Filipiak wrote:

Hi Matthias thanks,

Exactly what I was guessing.

I don't understand why custom stores in DSL? and I don't understand why
we are not concidering a more generic config based appraoch?

StateStores in DSL => what I really think we are looking for PAPA => DSL
=> PAPI  back and forth switcharoo capabilities.

Looking at the most overloaded that I can currently find "through()" 2
of them come from the broken idea of "the user provides a name for the
statestore for IQ" and custom statestores.
 From the beginning I said that's madness. That is the real disease we
need to fix IMHO. To be honest I also don't understand why through with
statestore is particularly usefull, second Unique Key maybe?

also providing Serdes by config is neat. wouldn't even need to go into
the code then would also save a ton. (We have the defaults one in conf
why not override the specific ones?)

Does this makes sense to people? what pieces should i outline with code
(time is currently sparse :( but I can pull of some smaller examples i
guess)

Best Jan





On 08.07.2017 01:23, Matthias J. Sax wrote:

It's too issues we want to tackle

   - too many overload (for some method we have already more than 10(
   - improve custom store API

-Matthias


On 7/7/17 3:42 PM, Jan Filipiak wrote:

It makes me want to cry.

why on earth is the DSL going to expose all its implementation
details now?
especially being materialized or not.

If we want to take usefull steps in that direction maybe we are looking
for a way to let the user switch back and forth between PAPI and DSL?

A change as the proposed would not eliminate any of my pain points while
still being a heck of work migrating towards to.

Since I am only following this from the point where Eno CC'ed it into
the users list:

Can someone please rephrase for me what problem this is trying to solve?
I don't mean to be rude but It uses a problematic feature
"StateStoreSuppliers in DSL" to justify making it even worse. This helps
us nowhere in making the configs more flexible, its just syntactic
sugar.

A low effort shoot like: lets add a properties to operations that would
otherwise become overloaded to heavy? Or pull the configs by some naming
schema
form the overall properties. Additionally to that we get rid of
StateStoreSuppliers in the DSL and have them also configured by said
properties.

=> way easier to migrate to, way less risk, way more flexible in the
future (different implementations of the same operation don't require
code change to configure)

Line 184 makes especially no sense to me. what is a KTableKTable non
materialized join anyways?

Hope we can discuss more on this.



On 07.07.2017 17:23, Guozhang Wang wrote:

I messed the indentation on github code repos; this would be easier to
read:

https://codeshare.io/GLWW8K


Guozhang


On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang 
wrote:


Hi Damian / Kyle,

I think I agree with you guys about the pros / cons of using the
builder
pattern v.s. using some "secondary classes". And I'm thinking if we
can
take a "mid" manner between these two. I spent some time with a slight
different approach from Damian's current proposal:

https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-08 Thread Jan Filipiak

Hi Matthias thanks,

Exactly what I was guessing.

I don't understand why custom stores in DSL? and I don't understand why 
we are not concidering a more generic config based appraoch?


StateStores in DSL => what I really think we are looking for PAPA => DSL 
=> PAPI  back and forth switcharoo capabilities.


Looking at the most overloaded that I can currently find "through()" 2 
of them come from the broken idea of "the user provides a name for the 
statestore for IQ" and custom statestores.
From the beginning I said that's madness. That is the real disease we 
need to fix IMHO. To be honest I also don't understand why through with 
statestore is particularly usefull, second Unique Key maybe?


also providing Serdes by config is neat. wouldn't even need to go into 
the code then would also save a ton. (We have the defaults one in conf 
why not override the specific ones?)


Does this makes sense to people? what pieces should i outline with code 
(time is currently sparse :( but I can pull of some smaller examples i 
guess)


Best Jan





On 08.07.2017 01:23, Matthias J. Sax wrote:

It's too issues we want to tackle

  - too many overload (for some method we have already more than 10(
  - improve custom store API

-Matthias


On 7/7/17 3:42 PM, Jan Filipiak wrote:

It makes me want to cry.

why on earth is the DSL going to expose all its implementation details now?
especially being materialized or not.

If we want to take usefull steps in that direction maybe we are looking
for a way to let the user switch back and forth between PAPI and DSL?

A change as the proposed would not eliminate any of my pain points while
still being a heck of work migrating towards to.

Since I am only following this from the point where Eno CC'ed it into
the users list:

Can someone please rephrase for me what problem this is trying to solve?
I don't mean to be rude but It uses a problematic feature
"StateStoreSuppliers in DSL" to justify making it even worse. This helps
us nowhere in making the configs more flexible, its just syntactic sugar.

A low effort shoot like: lets add a properties to operations that would
otherwise become overloaded to heavy? Or pull the configs by some naming
schema
form the overall properties. Additionally to that we get rid of
StateStoreSuppliers in the DSL and have them also configured by said
properties.

=> way easier to migrate to, way less risk, way more flexible in the
future (different implementations of the same operation don't require
code change to configure)

Line 184 makes especially no sense to me. what is a KTableKTable non
materialized join anyways?

Hope we can discuss more on this.



On 07.07.2017 17:23, Guozhang Wang wrote:

I messed the indentation on github code repos; this would be easier to
read:

https://codeshare.io/GLWW8K


Guozhang


On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang  wrote:


Hi Damian / Kyle,

I think I agree with you guys about the pros / cons of using the builder
pattern v.s. using some "secondary classes". And I'm thinking if we can
take a "mid" manner between these two. I spent some time with a slight
different approach from Damian's current proposal:

https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/

java/org/apache/kafka/streams/RefactoredAPIs.java

The key idea is to tolerate the final "table()" or "stream()"
function to
"upgrade" from the secondary classes to the first citizen classes, while
having all the specs inside this function. Also this proposal
includes some
other refactoring that people have been discussed about for the
builder to
reduce the overloaded functions as well. WDYT?


Guozhang


On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy  wrote:


Hi Jan,

Thanks very much for the input.

On Tue, 4 Jul 2017 at 08:54 Jan Filipiak 
wrote:


Hi Damian,

I do see your point of something needs to change. But I fully agree
with
Gouzhang when he says.
---

But since this is a incompatibility change, and we are going to remove

the

compatibility annotations soon it means we only have one chance and we
really have to make it right.




I think we all agree on this one! Hence the discussion.



I fear all suggestions do not go far enough to become something that

will

carry on for very much longer.
I am currently working on KAFKA-3705 and try to find the most easy way

for

the user to give me all the required functionality. The easiest

interface I

could come up so far can be looked at here.


https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2

de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
kafka/streams/kstream/internals/KTableImpl.java#L622
And its already horribly complicated. I am currently unable to find the

right abstraction level to have everything falling into place

naturally. To

be honest I already think introducing



To be fair that is not a p

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-07 Thread Jan Filipiak

It makes me want to cry.

why on earth is the DSL going to expose all its implementation details now?
especially being materialized or not.

If we want to take usefull steps in that direction maybe we are looking 
for a way to let the user switch back and forth between PAPI and DSL?


A change as the proposed would not eliminate any of my pain points while 
still being a heck of work migrating towards to.


Since I am only following this from the point where Eno CC'ed it into 
the users list:


Can someone please rephrase for me what problem this is trying to solve? 
I don't mean to be rude but It uses a problematic feature
"StateStoreSuppliers in DSL" to justify making it even worse. This helps 
us nowhere in making the configs more flexible, its just syntactic sugar.


A low effort shoot like: lets add a properties to operations that would 
otherwise become overloaded to heavy? Or pull the configs by some naming 
schema
form the overall properties. Additionally to that we get rid of 
StateStoreSuppliers in the DSL and have them also configured by said 
properties.


=> way easier to migrate to, way less risk, way more flexible in the 
future (different implementations of the same operation don't require 
code change to configure)


Line 184 makes especially no sense to me. what is a KTableKTable non 
materialized join anyways?


Hope we can discuss more on this.



On 07.07.2017 17:23, Guozhang Wang wrote:

I messed the indentation on github code repos; this would be easier to read:

https://codeshare.io/GLWW8K


Guozhang


On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang  wrote:


Hi Damian / Kyle,

I think I agree with you guys about the pros / cons of using the builder
pattern v.s. using some "secondary classes". And I'm thinking if we can
take a "mid" manner between these two. I spent some time with a slight
different approach from Damian's current proposal:

https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/
java/org/apache/kafka/streams/RefactoredAPIs.java

The key idea is to tolerate the final "table()" or "stream()" function to
"upgrade" from the secondary classes to the first citizen classes, while
having all the specs inside this function. Also this proposal includes some
other refactoring that people have been discussed about for the builder to
reduce the overloaded functions as well. WDYT?


Guozhang


On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy  wrote:


Hi Jan,

Thanks very much for the input.

On Tue, 4 Jul 2017 at 08:54 Jan Filipiak 
wrote:


Hi Damian,

I do see your point of something needs to change. But I fully agree with
Gouzhang when he says.
---

But since this is a incompatibility change, and we are going to remove

the

compatibility annotations soon it means we only have one chance and we
really have to make it right.




I think we all agree on this one! Hence the discussion.



I fear all suggestions do not go far enough to become something that

will

carry on for very much longer.
I am currently working on KAFKA-3705 and try to find the most easy way

for

the user to give me all the required functionality. The easiest

interface I

could come up so far can be looked at here.


https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2

de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
kafka/streams/kstream/internals/KTableImpl.java#L622



And its already horribly complicated. I am currently unable to find the

right abstraction level to have everything falling into place

naturally. To

be honest I already think introducing



To be fair that is not a particularly easy problem to solve!



https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2

de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
kafka/streams/kstream/internals/KTableImpl.java#L493

was unideal and makes everything a mess.


I'm not sure i agree that it makes everything a mess, but It could have
been done differently.

The JoinType:Whatever is also not really flexible. 2 things come to my
mind:

1. I don't think we should rule out config based decisions say configs

like

 streams.$applicationID.joins.$joinname.conf = value


Is this just for config? Or are you suggesting that we could somehow
"code"
the join in a config file?



This can allow for tremendous changes without single API change and IMO

it

was not considered enough yet.

2. Push logic from the DSL to the Callback classes. A ValueJoiner for
example can be used to implement different join types as the user

wishes.
Do you have an example of how this might look?



As Gouzhang said: stopping to break users is very important.


Of course. We want to make it as easy as possible for people to use
streams.


especially with this changes + All the plans I sadly only have in my head

but hopefully the first link can give a glimpse.

Thanks for preparing the examples made it way clearer to me 

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-04 Thread Jan Filipiak

Hi Damian,

I do see your point of something needs to change. But I fully agree with 
Gouzhang when he says.

---

But since this is a incompatibility change, and we are going to remove the
compatibility annotations soon it means we only have one chance and we
really have to make it right.


I fear all suggestions do not go far enough to become something that will carry 
on for very much longer.
I am currently working on KAFKA-3705 and try to find the most easy way for the 
user to give me all the required functionality. The easiest interface I could 
come up so far can be looked at here.

https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L622

And its already horribly complicated. I am currently unable to find the right 
abstraction level to have everything falling into place naturally. To be honest 
I already think introducing

https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L493

was unideal and makes everything a mess. The JoinType:Whatever is also not 
really flexible. 2 things come to my mind:

1. I don't think we should rule out config based decisions say configs like
streams.$applicationID.joins.$joinname.conf = value
This can allow for tremendous changes without single API change and IMO it was 
not considered enough yet.

2. Push logic from the DSL to the Callback classes. A ValueJoiner for example 
can be used to implement different join types as the user wishes.

As Gouzhang said: stopping to break users is very important. especially with 
this changes + All the plans I sadly only have in my head but hopefully the 
first link can give a glimpse.

Thanks for preparing the examples made it way clearer to me what exactly we are 
talking about. I would argue to go a bit slower and more carefull on this one. 
At some point we need to get it right. Peeking over to the hadoop guys with 
their hughe userbase. Config files really work well for them.

Best Jan





On 30.06.2017 09:31, Damian Guy wrote:

Thanks Matthias

On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax  wrote:


I am just catching up on this thread, so sorry for the long email in
advance... Also, it's to some extend a dump of thoughts and not always a
clear proposal. Still need to think about this in more detail. But maybe
it helps other to get new ideas :)



However, I don't understand your argument about putting aggregate()
after the withXX() -- all the calls to withXX() set optional parameters
for aggregate() and not for groupBy() -- but a groupBy().withXX()
indicates that the withXX() belongs to the groupBy(). IMHO, this might
be quite confusion for developers.



I see what you are saying, but the grouped stream is effectively a no-op
until you call one of the aggregate/count/reduce etc functions. So the
optional params are ones that are applicable to any of the operations you
can perform on this grouped stream. Then the final
count()/reduce()/aggregate() call has any of the params that are
required/specific to that function.


I understand your argument, but you don't share the conclusion. If we
need a "final/terminal" call, the better way might be

.groupBy().count().withXX().build()

(with a better name for build() though)



The point is that all the other calls, i.e,withBlah, windowed, etc apply
too all the aggregate functions. The terminal call being the actual type of
aggregation you want to do. I personally find this more natural than
groupBy().count().withBlah().build()



groupedStream.count(/** non windowed count**/)
groupedStream.windowed(TimeWindows.of(10L)).count(...)
groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)


I like this. However, I don't see a reason to have windowed() and
sessionWindowed(). We should have one top-level `Windows` interface that
both `TimeWindows` and `SessionWindows` implement and just have a single
windowed() method that accepts all `Windows`. (I did not like the
separation of `SessionWindows` in the first place, and this seems to be
an opportunity to clean this up. It was hard to change when we
introduced session windows)


Yes - true we should look into that.



Btw: we do you the imperative groupBy() and groupByKey(), and thus we
might also want to use windowBy() (instead of windowed()). Not sure how
important this is, but it seems to be inconsistent otherwise.



Makes sense



About joins:  I don't like .withJoinType(JoinType.LEFT) at all. I think,
defining an inner/left/outer join is not an optional argument but a
first class concept and should have a proper representation in the API
(like the current methods join(), leftJoin, outerJoin()).



Yep, i did originally have it as a required param and maybe that is what we
go with. It could have a default, but maybe that is confusing.




About the two join API proposals, the second one has 

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-24 Thread Jan Filipiak

I am with Gouzhang here.

I think all the suggestions are far to short-sighted. Especially this 
wired materialize(String) call is broken totally and people go nuts 
about how this will look. + Implementing more and better joins, not this 
wired one we got currently. Implementing an one to many join I couln't 
get away without 3 highly complex value mappers


   final ValueMapper 
keyExtractor,
   final ValueMapper 
joinPrefixFaker,
   final ValueMapper 
leftKeyExtractor,


in addition to the one joiner of course

   final ValueJoiner joiner,

how to specify if its outer or inner is for sure the smallest problem we 
are going to face with proper join semantics. What the resulting Key 
will be is is also highly discussable. What happens to the key is very 
complex and the API has to tell the user.


Bringing this discussion into a good direction, we would need sample 
interfaces we could mock against ( as gouzhang suggested) + We need to 
know how the implementation (of joins especially) will be later. As I 
strongly recommend stopping the usage of ChangeSerde and have "properly" 
repartitioned topic. That is just sane IMO


Best Jan




On 22.06.2017 11:54, Eno Thereska wrote:

Note that while I agree with the initial proposal (withKeySerdes, withJoinType, 
etc), I don't agree with things like .materialize(), .enableCaching(), 
.enableLogging().

The former maintain the declarative DSL, while the later break the declarative 
part by mixing system decisions in the DSL.  I think there is a difference 
between the two proposals.

Eno


On 22 Jun 2017, at 03:46, Guozhang Wang  wrote:

I have been thinking about reducing all these overloaded functions for
stateful operations (there are some other places that introduces overloaded
functions but let's focus on these only in this discussion), what I used to
have is to use some "materialize" function on the KTables, like:

---

// specifying the topology

KStream stream1 = builder.stream();
KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator,
sessionMerger, sessionWindows);  // do not allow to pass-in a state store
supplier here any more

// additional specs along with the topology above

table1.materialize("queryableStoreName"); // or..
table1.materialize("queryableStoreName").enableCaching().enableLogging();
// or..
table1.materialize(stateStoreSupplier); // add the metrics / logging /
caching / windowing functionalities on top of the store, or..
table1.materialize(stateStoreSupplier).enableCaching().enableLogging(); //
etc..

---

But thinking about it more, I feel Damian's first proposal is better since
my proposal would likely to break the concatenation (e.g. we may not be
able to do sth. like "table1.filter().map().groupBy().aggregate()" if we
want to use different specs for the intermediate filtered KTable).


But since this is a incompatibility change, and we are going to remove the
compatibility annotations soon it means we only have one chance and we
really have to make it right. So I'd call out for anyone try to rewrite
your examples / demo code with the proposed new API and see if it feel
natural, for example, if I want to use a different storage engine than the
default rockDB engine how could I easily specify that with the proposed
APIs?

Meanwhile Damian could you provide a formal set of APIs for people to
exercise on them? Also could you briefly describe how custom storage
engines could be swapped in with the above APIs?



Guozhang


On Wed, Jun 21, 2017 at 9:08 AM, Eno Thereska 
wrote:


To make it clear, it’s outlined by Damian, I just copy pasted what he told
me in person :)

Eno


On Jun 21, 2017, at 4:40 PM, Bill Bejeck  wrote:

+1 for the approach outlined above by Eno.

On Wed, Jun 21, 2017 at 11:28 AM, Damian Guy 

wrote:

Thanks Eno.

Yes i agree. We could apply this same approach to most of the operations
where we have multiple overloads, i.e., we have a single method for each
operation that takes the required parameters and everything else is
specified as you have done above.

On Wed, 21 Jun 2017 at 16:24 Eno Thereska 

wrote:

(cc’ing user-list too)

Given that we already have StateStoreSuppliers that are configurable

using

the fluent-like API, probably it’s worth discussing the other examples

with

joins and serdes first since those have many overloads and are in need

of

some TLC.

So following your example, I guess you’d have something like:
.join()
  .withKeySerdes(…)
  .withValueSerdes(…)
  .withJoinType(“outer”)

etc?

I like the approach since it still remains declarative and it’d reduce

the

number of overloads by quite a bit.

Eno


On Jun 21, 2017, at 3:37 PM, Damian Guy  wrote:

Hi,

I'd like to get a discussion going around some of the API choices

we've

made in the DLS. In particular tho

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-22 Thread Jan Filipiak

Hi Eno,

I am less interested in the user facing interface but more in the actual 
implementation. Any hints where I can follow the discussion on this? As 
I still want to discuss upstreaming of KAFKA-3705 with someone


Best Jan


On 21.06.2017 17:24, Eno Thereska wrote:

(cc’ing user-list too)

Given that we already have StateStoreSuppliers that are configurable using the 
fluent-like API, probably it’s worth discussing the other examples with joins 
and serdes first since those have many overloads and are in need of some TLC.

So following your example, I guess you’d have something like:
.join()
.withKeySerdes(…)
.withValueSerdes(…)
.withJoinType(“outer”)

etc?

I like the approach since it still remains declarative and it’d reduce the 
number of overloads by quite a bit.

Eno


On Jun 21, 2017, at 3:37 PM, Damian Guy  wrote:

Hi,

I'd like to get a discussion going around some of the API choices we've
made in the DLS. In particular those that relate to stateful operations
(though this could expand).
As it stands we lean heavily on overloaded methods in the API, i.e, there
are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
feel it is only going to get worse as we add more optional params. In
particular we've had some requests to be able to turn caching off, or
change log configs,  on a per operator basis (note this can be done now if
you pass in a StateStoreSupplier, but this can be a bit cumbersome).

So this is a bit of an open question. How can we change the DSL overloads
so that it flows, is simple to use and understand, and is easily extended
in the future?

One option would be to use a fluent API approach for providing the optional
params, so something like this:

groupedStream.count()
   .withStoreName("name")
   .withCachingEnabled(false)
   .withLoggingEnabled(config)
   .table()



Another option would be to provide a Builder to the count method, so it
would look something like this:
groupedStream.count(new
CountBuilder("storeName").withCachingEnabled(false).build())

Another option is to say: Hey we don't need this, what are you on about!

The above has focussed on state store related overloads, but the same ideas
could  be applied to joins etc, where we presently have many join methods
and many overloads.

Anyway, i look forward to hearing your opinions.

Thanks,
Damian




Re: Reliably implementing global KeyValueStore#get

2017-06-07 Thread Jan Filipiak

Depends, embedded postgress puts you into the same spot.

But if you use your state store change log to materialize into a 
postgress; that might work out decently.
Current JDBC doesn't support delete which is an issue but writing a 
custom sink is not to hard.


Best Jan


On 07.06.2017 23:47, Steven Schlansker wrote:

I was actually considering writing my own KeyValueStore backed
by e.g. a Postgres or the like.

Is there some feature Connect gains me that would make it better
than such an approach?

thanks


On Jun 7, 2017, at 2:20 PM, Jan Filipiak  wrote:

Hi,

have you thought about using connect to put data into a store that is more 
reasonable for your kind of query requirements?

Best Jan

On 07.06.2017 00:29, Steven Schlansker wrote:

On Jun 6, 2017, at 2:52 PM, Damian Guy  wrote:

Steven,

In practice, data shouldn't be migrating that often. If it is then you
probably have bigger problems.

Understood and agreed, but when designing distributed systems, it usually
helps to model for the worst case rather than the "well that should never
happen" case, lest you find yourself fixing those bugs at 3am instead :)

I'd like to be able to induce extreme pain at the Kafka layer (change leader
every 3 seconds and migrate all partitions around randomly) and still have
my app behave correctly.


You should be able to use the metadata api
to find the instance the key should be on and then when you check that node
you can also check with the metadata api that the key should still be on
this host. If streams is rebalancing while you query an exception will be
raised and you'll need to retry the request once the rebalance has
completed.

Agreed here as well.  But let's assume I have a very fast replication
setup (assume it takes zero time, for the sake of argument) -- I'm fairly
sure there's still a race here as this exception only fires *during a migration*
not *after a migration that may have invalidated your metadata lookup completes*


HTH,
Damian

On Tue, 6 Jun 2017 at 18:11 Steven Schlansker 
wrote:


On Jun 6, 2017, at 6:16 AM, Eno Thereska  wrote:

Hi Steven,

Do you know beforehand if a key exists? If you know that and are getting

null() the code will have to retry by refreshing the metadata and going to
the new instance. If you don’t know beforehand if a key exists or not you
might have to check all instances of a store to make sure.
No, I am not presupposing that the key can exist -- this is a user visible
API and will
be prone to "accidents" :)

Thanks for the insight.  I worry that even checking all stores is not
truly sufficient,
as querying different all workers at different times in the presence of
migrating data
can still in theory miss it given pessimal execution.

I'm sure I've long wandered off into the hypothetical, but I dream of some
day being
cool like Jepsen :)


Eno



On Jun 5, 2017, at 10:12 PM, Steven Schlansker <

sschlans...@opentable.com> wrote:

Hi everyone, me again :)

I'm still trying to implement my "remoting" layer that allows
my clients to see the partitioned Kafka Streams state
regardless of which instance they hit.  Roughly, my lookup is:

Message get(Key key) {
  RemoteInstance instance = selectPartition(key);
  return instance.get(key); // http remoting
}

RemoteInstance.get(Key key) { // http endpoint
  return readOnlyKeyValueStore.get(key);
}

However, the mapping of partitions to instances may change.
If you call KeyValueStore.get(K) where K is on a partition you
don't own, it returns null.  This is indistinguishable from a
successful get on a key that doesn't exist.

If one instance selects a sibling instance right as the partition is

failing

off of that instance, it may get routed there and by the time it gets
the request no longer "owns" the partition -- returns a false 'null'.

You can try re-checking after you get a null value, but that's

susceptible

to the same race -- it's unlikely but possible that the data migrates

*back*

before you do this re-check.

Is there any way to correctly implement this without races?  I'd imagine
you need a new primitive like KeyValueStore#get that atomically finds
the key or throws an exception if it is not in an owned partition
at the time of lookup so you know to recheck the partition and retry.

Thoughts?

Thanks again,
Steven





Re: Reliably implementing global KeyValueStore#get

2017-06-07 Thread Jan Filipiak

Hi,

have you thought about using connect to put data into a store that is 
more reasonable for your kind of query requirements?


Best Jan

On 07.06.2017 00:29, Steven Schlansker wrote:

On Jun 6, 2017, at 2:52 PM, Damian Guy  wrote:

Steven,

In practice, data shouldn't be migrating that often. If it is then you
probably have bigger problems.

Understood and agreed, but when designing distributed systems, it usually
helps to model for the worst case rather than the "well that should never
happen" case, lest you find yourself fixing those bugs at 3am instead :)

I'd like to be able to induce extreme pain at the Kafka layer (change leader
every 3 seconds and migrate all partitions around randomly) and still have
my app behave correctly.


You should be able to use the metadata api
to find the instance the key should be on and then when you check that node
you can also check with the metadata api that the key should still be on
this host. If streams is rebalancing while you query an exception will be
raised and you'll need to retry the request once the rebalance has
completed.

Agreed here as well.  But let's assume I have a very fast replication
setup (assume it takes zero time, for the sake of argument) -- I'm fairly
sure there's still a race here as this exception only fires *during a migration*
not *after a migration that may have invalidated your metadata lookup completes*


HTH,
Damian

On Tue, 6 Jun 2017 at 18:11 Steven Schlansker 
wrote:


On Jun 6, 2017, at 6:16 AM, Eno Thereska  wrote:

Hi Steven,

Do you know beforehand if a key exists? If you know that and are getting

null() the code will have to retry by refreshing the metadata and going to
the new instance. If you don’t know beforehand if a key exists or not you
might have to check all instances of a store to make sure.
No, I am not presupposing that the key can exist -- this is a user visible
API and will
be prone to "accidents" :)

Thanks for the insight.  I worry that even checking all stores is not
truly sufficient,
as querying different all workers at different times in the presence of
migrating data
can still in theory miss it given pessimal execution.

I'm sure I've long wandered off into the hypothetical, but I dream of some
day being
cool like Jepsen :)


Eno



On Jun 5, 2017, at 10:12 PM, Steven Schlansker <

sschlans...@opentable.com> wrote:

Hi everyone, me again :)

I'm still trying to implement my "remoting" layer that allows
my clients to see the partitioned Kafka Streams state
regardless of which instance they hit.  Roughly, my lookup is:

Message get(Key key) {
  RemoteInstance instance = selectPartition(key);
  return instance.get(key); // http remoting
}

RemoteInstance.get(Key key) { // http endpoint
  return readOnlyKeyValueStore.get(key);
}

However, the mapping of partitions to instances may change.
If you call KeyValueStore.get(K) where K is on a partition you
don't own, it returns null.  This is indistinguishable from a
successful get on a key that doesn't exist.

If one instance selects a sibling instance right as the partition is

failing

off of that instance, it may get routed there and by the time it gets
the request no longer "owns" the partition -- returns a false 'null'.

You can try re-checking after you get a null value, but that's

susceptible

to the same race -- it's unlikely but possible that the data migrates

*back*

before you do this re-check.

Is there any way to correctly implement this without races?  I'd imagine
you need a new primitive like KeyValueStore#get that atomically finds
the key or throws an exception if it is not in an owned partition
at the time of lookup so you know to recheck the partition and retry.

Thoughts?

Thanks again,
Steven







Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-07 Thread Jan Filipiak

Hi Eno,

On 07.06.2017 22:49, Eno Thereska wrote:

Comments inline:


On 5 Jun 2017, at 18:19, Jan Filipiak  wrote:

Hi

just my few thoughts

On 05.06.2017 11:44, Eno Thereska wrote:

Hi there,

Sorry for the late reply, I was out this past week. Looks like good progress 
was made with the discussions either way. Let me recap a couple of points I saw 
into one big reply:

1. Jan mentioned CRC errors. I think this is a good point. As these happen in 
Kafka, before Kafka Streams gets a chance to inspect anything, I'd like to hear 
the opinion of more Kafka folks like Ismael or Jason on this one. Currently the 
documentation is not great with what to do once a CRC check has failed. From 
looking at the code, it looks like the client gets a KafkaException (bubbled up 
from the fetcher) and currently we in streams catch this as part of poll() and 
fail. It might be advantageous to treat CRC handling in a similar way to 
serialisation handling (e.g., have the option to fail/skip). Let's see what the 
other folks say. Worst-case we can do a separate KIP for that if it proved too 
hard to do in one go.

there is no reasonable way to "skip" a crc error. How can you know the length 
you read was anything reasonable? you might be completely lost inside your response.

On the client side, every record received is checked for validity. As it 
happens, if the CRC check fails the exception is wrapped with a KafkaException 
that is thrown all the way to poll(). Assuming we change that and poll() throws 
a CRC exception, I was thinking we could treat it similarly to a deserialize 
exception and pass it to the exception handler to decide what to do. Default 
would be to fail. This might need a Kafka KIP btw and can be done separately 
from this KIP, but Jan, would you find this useful?
I don't think so. IMO you can not reasonably continue parsing when the 
checksum of a message is not correct. If you are not sure you got the 
correct length, how can you be sure to find the next record? I would 
always straight fail in all cases. Its to hard for me to understand why 
one would try to continue. I mentioned CRC's because thats the only bad 
pills I ever saw so far. But I am happy that it just stopped and I could 
check what was going on. This will also be invasive in the client code then.


If you ask me, I am always going to vote for "grind to halt" let the 
developers see what happened and let them fix it. It helps building good 
kafka experiences and better software and architectures. For me this is: 
"force the user todo the right thing". 
https://youtu.be/aAb7hSCtvGw?t=374 eg. not letting unexpected input slip 
by.  Letting unexpected input slip by is what bought us 15+years of war 
of all sorts of ingestion attacks. I don't even dare to estimate how 
many missingrecords-search-teams going be formed, maybe some hackerone 
for stream apps :D


Best Jan




At a minimum, handling this type of exception will need to involve the 
exactly-once (EoS) logic. We'd still allow the option of failing or skipping, 
but EoS would need to clean up by rolling back all the side effects from the 
processing so far. Matthias, how does this sound?

Eos will not help the record might be 5,6 repartitions down into the topology. 
I haven't followed but I pray you made EoS optional! We don't need this and we 
don't want this and we will turn it off if it comes. So I wouldn't recommend 
relying on it. The option to turn it off is better than forcing it and still 
beeing unable to rollback badpills (as explained before)

Yeah as Matthias mentioned EoS is optional.

Thanks,
Eno



6. Will add an end-to-end example as Michael suggested.

Thanks
Eno




On 4 Jun 2017, at 02:35, Matthias J. Sax  wrote:

What I don't understand is this:


 From there on its the easiest way forward: fix, redeploy, start => done

If you have many producers that work fine and a new "bad" producer
starts up and writes bad data into your input topic, your Streams app
dies but all your producers, including the bad one, keep writing.

Thus, how would you fix this, as you cannot "remove" the corrupted date
from the topic? It might take some time to identify the root cause and
stop the bad producer. Up to this point you get good and bad data into
your Streams input topic. If Streams app in not able to skip over those
bad records, how would you get all the good data from the topic? Not
saying it's not possible, but it's extra work copying the data with a
new non-Streams consumer-producer-app into a new topic and than feed
your Streams app from this new topic -- you also need to update all your
upstream producers to write to the new topic.

Thus, if you want to fail fast, you can still do this. And after you
detected and fixed the bad producer you might just reconfigure your app
to skip bad records until it reaches the good part of the data.
Afterwards, you 

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-05 Thread Jan Filipiak
 app.

This seems to be much simpler. Or do I miss anything?


Having said this, I agree that the "threshold based callback" might be
questionable. But as you argue for strict "fail-fast", I want to argue
that this must not always be the best pattern to apply and that the
overall KIP idea is super useful from my point of view.


-Matthias


On 6/3/17 11:57 AM, Jan Filipiak wrote:

Could not agree more!

But then I think the easiest is still: print exception and die.
 From there on its the easiest way forward: fix, redeploy, start => done

All the other ways to recover a pipeline that was processing partially
all the time
and suddenly went over a "I cant take it anymore" threshold is not
straight forward IMO.

How to find the offset, when it became to bad when it is not the latest
commited one?
How to reset there? with some reasonable stuff in your rockses?

If one would do the following. The continuing Handler would measure for
a threshold and
would terminate after a certain threshold has passed (per task). Then
one can use offset commit/ flush intervals
to make reasonable assumption of how much is slipping by + you get an
easy recovery when it gets to bad
+ you could also account for "in processing" records.

Setting this threshold to zero would cover all cases with 1
implementation. It is still beneficial to have it pluggable

Again CRC-Errors are the only bad pills we saw in production for now.

Best Jan


On 02.06.2017 17:37, Jay Kreps wrote:

Jan, I agree with you philosophically. I think one practical challenge
has
to do with data formats. Many people use untyped events, so there is
simply
no guarantee on the form of the input. E.g. many companies use JSON
without
any kind of schema so it becomes very hard to assert anything about the
input which makes these programs very fragile to the "one accidental
message publication that creates an unsolvable problem.

For that reason I do wonder if limiting to just serialization actually
gets
you a useful solution. For JSON it will help with the problem of
non-parseable JSON, but sounds like it won't help in the case where the
JSON is well-formed but does not have any of the fields you expect and
depend on for your processing. I expect the reason for limiting the scope
is it is pretty hard to reason about correctness for anything that
stops in
the middle of processing an operator DAG?

-Jay

On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak 
wrote:


IMHO your doing it wrong then. + building to much support into the kafka
eco system is very counterproductive in fostering a happy userbase



On 02.06.2017 13:15, Damian Guy wrote:


Jan, you have a choice to Fail fast if you want. This is about giving
people options and there are times when you don't want to fail fast.


On Fri, 2 Jun 2017 at 11:00 Jan Filipiak 
wrote:

Hi

1.
That greatly complicates monitoring.  Fail Fast gives you that when
you
monitor only the lag of all your apps
you are completely covered. With that sort of new application
Monitoring
is very much more complicated as
you know need to monitor fail % of some special apps aswell. In my
opinion that is a huge downside already.

2.
using a schema regerstry like Avrostuff it might not even be the
record
that is broken, it might be just your app
unable to fetch a schema it needs now know. Maybe you got partitioned
away from that registry.

3. When you get alerted because of to high fail percentage. what
are the
steps you gonna do?
shut it down to buy time. fix the problem. spend way to much time to
find a good reprocess offset.
Your timewindows are in bad shape anyways, and you pretty much lost.
This routine is nonsense.

Dead letter queues would be the worst possible addition to the kafka
toolkit that I can think of. It just doesn't fit the architecture
of having clients falling behind is a valid option.

Further. I mentioned already the only bad pill ive seen so far is crc
errors. any plans for those?

Best Jan






On 02.06.2017 11:34, Damian Guy wrote:


I agree with what Matthias has said w.r.t failing fast. There are
plenty


of


times when you don't want to fail-fast and must attempt to  make


progress.


The dead-letter queue is exactly for these circumstances. Of
course if
every record is failing, then you probably do want to give up.

On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax 


wrote:


First a meta comment. KIP discussion should take place on the dev
list

-- if user list is cc'ed please make sure to reply to both lists.


Thanks.
Thanks for making the scope of the KIP clear. Makes a lot of sense to

focus on deserialization exceptions for now.

With regard to corrupted state stores, would it make sense to fail a
task and wipe out the store to repair it via recreation from the
changelog? That's of course a quite advance pattern, but I want to
bring
it up to design the first step in a way such that we can get
there (if
we think it's a reasonable idea).

I also

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-03 Thread Jan Filipiak

Could not agree more!

But then I think the easiest is still: print exception and die.
From there on its the easiest way forward: fix, redeploy, start => done

All the other ways to recover a pipeline that was processing partially 
all the time
and suddenly went over a "I cant take it anymore" threshold is not 
straight forward IMO.


How to find the offset, when it became to bad when it is not the latest 
commited one?

How to reset there? with some reasonable stuff in your rockses?

If one would do the following. The continuing Handler would measure for 
a threshold and
would terminate after a certain threshold has passed (per task). Then 
one can use offset commit/ flush intervals
to make reasonable assumption of how much is slipping by + you get an 
easy recovery when it gets to bad

+ you could also account for "in processing" records.

Setting this threshold to zero would cover all cases with 1 
implementation. It is still beneficial to have it pluggable


Again CRC-Errors are the only bad pills we saw in production for now.

Best Jan


On 02.06.2017 17:37, Jay Kreps wrote:

Jan, I agree with you philosophically. I think one practical challenge has
to do with data formats. Many people use untyped events, so there is simply
no guarantee on the form of the input. E.g. many companies use JSON without
any kind of schema so it becomes very hard to assert anything about the
input which makes these programs very fragile to the "one accidental
message publication that creates an unsolvable problem.

For that reason I do wonder if limiting to just serialization actually gets
you a useful solution. For JSON it will help with the problem of
non-parseable JSON, but sounds like it won't help in the case where the
JSON is well-formed but does not have any of the fields you expect and
depend on for your processing. I expect the reason for limiting the scope
is it is pretty hard to reason about correctness for anything that stops in
the middle of processing an operator DAG?

-Jay

On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak 
wrote:


IMHO your doing it wrong then. + building to much support into the kafka
eco system is very counterproductive in fostering a happy userbase



On 02.06.2017 13:15, Damian Guy wrote:


Jan, you have a choice to Fail fast if you want. This is about giving
people options and there are times when you don't want to fail fast.


On Fri, 2 Jun 2017 at 11:00 Jan Filipiak 
wrote:

Hi

1.
That greatly complicates monitoring.  Fail Fast gives you that when you
monitor only the lag of all your apps
you are completely covered. With that sort of new application Monitoring
is very much more complicated as
you know need to monitor fail % of some special apps aswell. In my
opinion that is a huge downside already.

2.
using a schema regerstry like Avrostuff it might not even be the record
that is broken, it might be just your app
unable to fetch a schema it needs now know. Maybe you got partitioned
away from that registry.

3. When you get alerted because of to high fail percentage. what are the
steps you gonna do?
shut it down to buy time. fix the problem. spend way to much time to
find a good reprocess offset.
Your timewindows are in bad shape anyways, and you pretty much lost.
This routine is nonsense.

Dead letter queues would be the worst possible addition to the kafka
toolkit that I can think of. It just doesn't fit the architecture
of having clients falling behind is a valid option.

Further. I mentioned already the only bad pill ive seen so far is crc
errors. any plans for those?

Best Jan






On 02.06.2017 11:34, Damian Guy wrote:


I agree with what Matthias has said w.r.t failing fast. There are plenty


of


times when you don't want to fail-fast and must attempt to  make


progress.


The dead-letter queue is exactly for these circumstances. Of course if
every record is failing, then you probably do want to give up.

On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax 


wrote:


First a meta comment. KIP discussion should take place on the dev list

-- if user list is cc'ed please make sure to reply to both lists.


Thanks.
Thanks for making the scope of the KIP clear. Makes a lot of sense to

focus on deserialization exceptions for now.

With regard to corrupted state stores, would it make sense to fail a
task and wipe out the store to repair it via recreation from the
changelog? That's of course a quite advance pattern, but I want to
bring
it up to design the first step in a way such that we can get there (if
we think it's a reasonable idea).

I also want to comment about fail fast vs making progress. I think that
fail-fast must not always be the best option. The scenario I have in
mind is like this: you got a bunch of producers that feed the Streams
input topic. Most producers work find, but maybe one producer miss
behaves and the data it writes is corrupted. You might not even be able
to recover this lost data at any point -

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-02 Thread Jan Filipiak
IMHO your doing it wrong then. + building to much support into the kafka 
eco system is very counterproductive in fostering a happy userbase



On 02.06.2017 13:15, Damian Guy wrote:

Jan, you have a choice to Fail fast if you want. This is about giving
people options and there are times when you don't want to fail fast.


On Fri, 2 Jun 2017 at 11:00 Jan Filipiak  wrote:


Hi

1.
That greatly complicates monitoring.  Fail Fast gives you that when you
monitor only the lag of all your apps
you are completely covered. With that sort of new application Monitoring
is very much more complicated as
you know need to monitor fail % of some special apps aswell. In my
opinion that is a huge downside already.

2.
using a schema regerstry like Avrostuff it might not even be the record
that is broken, it might be just your app
unable to fetch a schema it needs now know. Maybe you got partitioned
away from that registry.

3. When you get alerted because of to high fail percentage. what are the
steps you gonna do?
shut it down to buy time. fix the problem. spend way to much time to
find a good reprocess offset.
Your timewindows are in bad shape anyways, and you pretty much lost.
This routine is nonsense.

Dead letter queues would be the worst possible addition to the kafka
toolkit that I can think of. It just doesn't fit the architecture
of having clients falling behind is a valid option.

Further. I mentioned already the only bad pill ive seen so far is crc
errors. any plans for those?

Best Jan






On 02.06.2017 11:34, Damian Guy wrote:

I agree with what Matthias has said w.r.t failing fast. There are plenty

of

times when you don't want to fail-fast and must attempt to  make

progress.

The dead-letter queue is exactly for these circumstances. Of course if
every record is failing, then you probably do want to give up.

On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax 

wrote:

First a meta comment. KIP discussion should take place on the dev list
-- if user list is cc'ed please make sure to reply to both lists.

Thanks.

Thanks for making the scope of the KIP clear. Makes a lot of sense to
focus on deserialization exceptions for now.

With regard to corrupted state stores, would it make sense to fail a
task and wipe out the store to repair it via recreation from the
changelog? That's of course a quite advance pattern, but I want to bring
it up to design the first step in a way such that we can get there (if
we think it's a reasonable idea).

I also want to comment about fail fast vs making progress. I think that
fail-fast must not always be the best option. The scenario I have in
mind is like this: you got a bunch of producers that feed the Streams
input topic. Most producers work find, but maybe one producer miss
behaves and the data it writes is corrupted. You might not even be able
to recover this lost data at any point -- thus, there is no reason to
stop processing but you just skip over those records. Of course, you
need to fix the root cause, and thus you need to alert (either via logs
of the exception handler directly) and you need to start to investigate
to find the bad producer, shut it down and fix it.

Here the dead letter queue comes into place. From my understanding, the
purpose of this feature is solely enable post debugging. I don't think
those record would be fed back at any point in time (so I don't see any
ordering issue -- a skipped record, with this regard, is just "fully
processed"). Thus, the dead letter queue should actually encode the
original records metadata (topic, partition offset etc) to enable such
debugging. I guess, this might also be possible if you just log the bad
records, but it would be harder to access (you first must find the
Streams instance that did write the log and extract the information from
there). Reading it from topic is much simpler.

I also want to mention the following. Assume you have such a topic with
some bad records and some good records. If we always fail-fast, it's
going to be super hard to process the good data. You would need to write
an extra app that copied the data into a new topic filtering out the bad
records (or apply the map() workaround withing stream). So I don't think
that failing fast is most likely the best option in production is
necessarily, true.

Or do you think there are scenarios, for which you can recover the
corrupted records successfully? And even if this is possible, it might
be a case for reprocessing instead of failing the whole application?
Also, if you think you can "repair" a corrupted record, should the
handler allow to return a "fixed" record? This would solve the ordering
problem.



-Matthias




On 5/30/17 1:47 AM, Michael Noll wrote:

Thanks for your work on this KIP, Eno -- much appreciated!

- I think it would help to improve the KIP by adding an end-to-end code
example that demonstrates, with the DSL and with the Processor API, how

the

user w

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-02 Thread Jan Filipiak
w in which part of the topology

the

record failed? `ConsumerRecord` gives us access to topic, partition,
offset, timestamp, etc., but what about topology-related information

(e.g.

what is the associated state store, if any)?

- Only partly on-topic for the scope of this KIP, but this is about the
bigger picture: This KIP would give users the option to send corrupted
records to dead letter queue (quarantine topic).  But, what pattern would
we advocate to process such a dead letter queue then, e.g. how to allow

for

retries with backoff ("If the first record in the dead letter queue fails
again, then try the second record for the time being and go back to the
first record at a later time").  Jay and Jan already alluded to ordering
problems that will be caused by dead letter queues. As I said, retries
might be out of scope but perhaps the implications should be considered

if

possible?

Also, I wrote the text below before reaching the point in the

conversation

that this KIP's scope will be limited to exceptions in the category of
poison pills / deserialization errors.  But since Jay brought up user

code

errors again, I decided to include it again.

snip
A meta comment: I am not sure about this split between the code for the
happy path (e.g. map/filter/... in the DSL) from the failure path (using
exception handlers).  In Scala, for example, we can do:

 scala> val computation = scala.util.Try(1 / 0)
 computation: scala.util.Try[Int] =
Failure(java.lang.ArithmeticException: / by zero)

 scala> computation.getOrElse(42)
 res2: Int = 42

Another example with Scala's pattern matching, which is similar to
`KStream#branch()`:

 computation match {
   case scala.util.Success(x) => x * 5
   case scala.util.Failure(_) => 42
 }

(The above isn't the most idiomatic way to handle this in Scala, but

that's

not the point I'm trying to make here.)

Hence the question I'm raising here is: Do we want to have an API where

you

code "the happy path", and then have a different code path for failures
(using exceptions and handlers);  or should we treat both Success and
Failure in the same way?

I think the failure/exception handling approach (as proposed in this KIP)
is well-suited for errors in the category of deserialization problems aka
poison pills, partly because the (default) serdes are defined through
configuration (explicit serdes however are defined through API calls).

However, I'm not yet convinced that the failure/exception handling

approach

is the best idea for user code exceptions, e.g. if you fail to guard
against NPE in your lambdas or divide a number by zero.

 scala> val stream = Seq(1, 2, 3, 4, 5)
 stream: Seq[Int] = List(1, 2, 3, 4, 5)

 // Here: Fallback to a sane default when encountering failed records
 scala> stream.map(x => Try(1/(3 - x))).flatMap(t =>
Seq(t.getOrElse(42)))
 res19: Seq[Int] = List(0, 1, 42, -1, 0)

 // Here: Skip over failed records
 scala> stream.map(x => Try(1/(3 - x))).collect{ case Success(s) => s

}

 res20: Seq[Int] = List(0, 1, -1, 0)

The above is more natural to me than using error handlers to define how

to

deal with failed records (here, the value `3` causes an arithmetic
exception).  Again, it might help the KIP if we added an end-to-end

example

for such user code errors.
snip




On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak 
wrote:


Hi Jay,

Eno mentioned that he will narrow down the scope to only ConsumerRecord
deserialisation.

I am working with Database Changelogs only. I would really not like to

see

a dead letter queue or something
similliar. how am I expected to get these back in order. Just grind to
hold an call me on the weekend. I'll fix it
then in a few minutes rather spend 2 weeks ordering dead letters. (where
reprocessing might be even the faster fix)

Best Jan




On 29.05.2017 20:23, Jay Kreps wrote:


 - I think we should hold off on retries unless we have worked out

the

 full usage pattern, people can always implement their own. I think
the idea
 is that you send the message to some kind of dead letter queue and
then
 replay these later. This obviously destroys all semantic guarantees
we are
 working hard to provide right now, which may be okay.









Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-30 Thread Jan Filipiak

Hi Jay,

Eno mentioned that he will narrow down the scope to only ConsumerRecord 
deserialisation.


I am working with Database Changelogs only. I would really not like to 
see a dead letter queue or something
similliar. how am I expected to get these back in order. Just grind to 
hold an call me on the weekend. I'll fix it
then in a few minutes rather spend 2 weeks ordering dead letters. (where 
reprocessing might be even the faster fix)


Best Jan



On 29.05.2017 20:23, Jay Kreps wrote:

- I think we should hold off on retries unless we have worked out the
full usage pattern, people can always implement their own. I think the idea
is that you send the message to some kind of dead letter queue and then
replay these later. This obviously destroys all semantic guarantees we are
working hard to provide right now, which may be okay.




Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-28 Thread Jan Filipiak

+1

On 26.05.2017 18:36, Damian Guy wrote:

In that case, though, every access to that key is doomed to failure as the
database is corrupted. So i think it should probably die in a steaming heap
at that point!

On Fri, 26 May 2017 at 17:33 Eno Thereska  wrote:


Hi Damian,

I was thinking of cases when there is bit-rot on the storage itself and we
get a malformed record that cannot be de-serialized. There is an
interesting intersection here with CRCs in both Kafka (already there, they
throw on deserialization) and potentially local storage (we don't have CRCs
here on the data files, though RocksDB has them on its write-ahead log
records).

Basically in a nutshell, I'm saying that every deserialization exception
should go through this new path. The user can decide to fail or continue.
We could start with just poison pills from Kafka though and punt the
storage one to later.

Eno


On 26 May 2017, at 16:59, Damian Guy  wrote:

Eno,

Under what circumstances would you get a deserialization exception from

the

state store? I can only think of the case where someone has provided a

bad

deserializer to a method that creates a state store. In which case it

would

be a user error and probably should just abort?

Thanks,
Damian

On Fri, 26 May 2017 at 16:32 Eno Thereska 

wrote:

See latest reply to Jan's note. I think I unnecessarily broadened the
scope of this KIP to the point where it sounded like it handles all

sorts

of exceptions. The scope should be strictly limited to "poison pill"
records for now. Will update KIP,

Thanks
Eno

On 26 May 2017, at 16:16, Matthias J. Sax 

wrote:

"bad" for this case would mean, that we got an
`DeserializationException`. I am not sure if any other processing error
should be covered?

@Eno: this raises one one question. Might it be better to allow for two
handlers instead of one? One for deserialization exception and one for
all other exceptions from user code?

Just a thought.


-Matthias

On 5/26/17 7:49 AM, Jim Jagielski wrote:

On May 26, 2017, at 5:13 AM, Eno Thereska 

wrote:




With regard to `DeserializationException`, do you thing it might

make

sense to have a "dead letter queue" as a feature to provide

out-of-the-box?

We could provide a special topic where bad messages go to, and then

we'd have to add a config option for the user to provide a topic. Is

that

what you're thinking?

For various definitions of "bad"??









Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-26 Thread Jan Filipiak

Hi Eno,

that does make a lot more sense to me. when you pop stuff out of a topic 
you can at least put the coordinates (topicpartition,offset) 
additionally into the log wich is probably kinda nice to just fetch it 
from CLI an check whats going on.


One additional question:

This handler is only going to cover Serde exceptions or MessageSet 
Iterator exceptions aswell? Speaking Checksum Error. We can't rely on 
the deserializer to properly throw when we hand it data with a bad 
checksum + the checksum errors are the only bad pills I have seen in 
production until this point.


Best Jan


On 26.05.2017 17:31, Eno Thereska wrote:

Hi Jan,

You're right. I think I got carried away and broadened the scope of this KIP beyond it's 
original purpose. This handler will only be there for deserialization errors, i.e., 
"poison pills" and is not intended to be a catch-all handler for all sorts of 
other problems (e.g., NPE exception in user code). Deserialization erros can happen 
either when polling or when deserialising from a state store. So that narrows down the 
scope of the KIP, will update it.

Thanks
Eno


On 26 May 2017, at 11:31, Jan Filipiak  wrote:

Hi

unfortunatly no. Think about "caching" these records popping outta there or 
multiple step Tasks (join,aggregate,repartiton all in one go) last repartitioner might 
throw cause it cant determine the partition only because a get on the join store cause a 
flush through the aggregates. This has nothing todo with a ConsumerRecord at all. 
Especially not the one we most recently processed.

To be completly honest. All but grining to a hold is not appealing to me at 
all. Sure maybe lagmonitoring will call me on Sunday but I can at least be 
confident its working the rest of the time.

Best Jan

PS.:

Hope you get my point. I am mostly complaing about

|public| |interface| |RecordExceptionHandler {|
|||/**|
|||* Inspect a record and the exception received|
|||*/|
|||HandlerResponse handle(that guy here >>>>>>>   ConsumerRecord<||byte||[], 
||byte||[]> record, Exception exception);|
|}|
||
|public| |enum| |HandlerResponse {|
|||/* continue with processing */|
|||CONTINUE(||1||), |
|||/* fail the processing and stop */|
|||FAIL(||2||);|
|}|



On 26.05.2017 11:18, Eno Thereska wrote:

Thanks Jan,

The record passed to the handler will always be the problematic record. There 
are 2 cases/types of exceptions for the purposes of this KIP: 1) any exception 
during deserialization. The bad record + the exception (i.e. 
DeserializeException) will be passed to the handler. The handler will be able 
to tell this was a deserialization error.
2) any exception during processing of this record. So whenever a processor gets 
the record (after some caching, etc) it starts to process it, then it fails, 
then it will call the handler with this record.

Does that match your thinking?

Thanks,
Eno



On 26 May 2017, at 09:51, Jan Filipiak  wrote:

Hi,

quick question: From the KIP it doesn't quite makes sense to me how that fits 
with caching.
With caching the consumer record might not be at all related to some processor 
throwing while processing.

would it not make more sense to get the ProcessorName + object object for 
processing and
statestore or topic name + byte[] byte[]  for serializers? maybe passing in the 
used serdes?

Best Jan



On 25.05.2017 11:47, Eno Thereska wrote:

Hi there,

I’ve added a KIP on improving exception handling in streams:
KIP-161: streams record processing exception handlers. 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+record+processing+exception+handlers
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+record+processing+exception+handlers>

Discussion and feedback is welcome, thank you.
Eno




Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-26 Thread Jan Filipiak

Hi

unfortunatly no. Think about "caching" these records popping outta there 
or multiple step Tasks (join,aggregate,repartiton all in one go) last 
repartitioner might throw cause it cant determine the partition only 
because a get on the join store cause a flush through the aggregates. 
This has nothing todo with a ConsumerRecord at all. Especially not the 
one we most recently processed.


To be completly honest. All but grining to a hold is not appealing to me 
at all. Sure maybe lagmonitoring will call me on Sunday but I can at 
least be confident its working the rest of the time.


Best Jan

PS.:

Hope you get my point. I am mostly complaing about

|public| |interface| |RecordExceptionHandler {|
|||/**|
|||* Inspect a record and the exception received|
|||*/|
|||HandlerResponse handle(that guy here >>>>>>>   
ConsumerRecord<||byte||[], ||byte||[]> record, Exception exception);|

|}|
||
|public| |enum| |HandlerResponse {|
|||/* continue with processing */|
|||CONTINUE(||1||), |
|||/* fail the processing and stop */|
|||FAIL(||2||);|
|}|



On 26.05.2017 11:18, Eno Thereska wrote:

Thanks Jan,

The record passed to the handler will always be the problematic record. There 
are 2 cases/types of exceptions for the purposes of this KIP: 1) any exception 
during deserialization. The bad record + the exception (i.e. 
DeserializeException) will be passed to the handler. The handler will be able 
to tell this was a deserialization error.
2) any exception during processing of this record. So whenever a processor gets 
the record (after some caching, etc) it starts to process it, then it fails, 
then it will call the handler with this record.

Does that match your thinking?

Thanks,
Eno



On 26 May 2017, at 09:51, Jan Filipiak  wrote:

Hi,

quick question: From the KIP it doesn't quite makes sense to me how that fits 
with caching.
With caching the consumer record might not be at all related to some processor 
throwing while processing.

would it not make more sense to get the ProcessorName + object object for 
processing and
statestore or topic name + byte[] byte[]  for serializers? maybe passing in the 
used serdes?

Best Jan



On 25.05.2017 11:47, Eno Thereska wrote:

Hi there,

I’ve added a KIP on improving exception handling in streams:
KIP-161: streams record processing exception handlers. 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+record+processing+exception+handlers
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+record+processing+exception+handlers>

Discussion and feedback is welcome, thank you.
Eno




Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-26 Thread Jan Filipiak

Hi,

quick question: From the KIP it doesn't quite makes sense to me how that 
fits with caching.
With caching the consumer record might not be at all related to some 
processor throwing while processing.


would it not make more sense to get the ProcessorName + object object 
for processing and
statestore or topic name + byte[] byte[]  for serializers? maybe passing 
in the used serdes?


Best Jan



On 25.05.2017 11:47, Eno Thereska wrote:

Hi there,

I’ve added a KIP on improving exception handling in streams:
KIP-161: streams record processing exception handlers. 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+record+processing+exception+handlers
 


Discussion and feedback is welcome, thank you.
Eno




ThoughWorks Tech Radar: Assess Kafka Streams

2017-03-29 Thread Jan Filipiak

Regardless of how usefull you find the tech radar.

Well deserved! even though we all here agree that trial or adopt is in reach

https://www.thoughtworks.com/radar/platforms/kafka-streams

Best Jan




Re: Implementing a non-key in Kafka Streams using the Processor API

2017-02-21 Thread Jan Filipiak

Hi,

yeah if the proposed solution is doable (only constrain really is to not 
have a parent key with lots of children) completly in the DSL except the 
lateral view

wich is a pretty easy thing in PAPI.

Our own implementation is a mix of reusing DSL interfaces but using 
reflection against KTableImpl to drop down to PAPI. Probably one 
limiting factor why i am not that eager to share publicly, cause its 
kinda ugly. The development at the moment (removing many featueres from 
PAPI) is very worrisome for me, so I should get moving having upstream 
support.


regarding the output key, we forced the user to pick a combined key 
parent+child_id, this works out pretty nicely as you get access to the 
partition information in the partitioner also in the delete cases + on 
the recieving side you can use just a regular KTableSource to materialze 
and have the parent key as prefix automatically. + It will do the 
naturally correct thing if you update parent_id in the child table. 
Upstream support would also be helpfull as the statestores are changelog 
even though we can use the intermediate topic for state store high 
availability.


Best Jan

On 21.02.2017 20:15, Guozhang Wang wrote:

Jan,

Sure I would love to hear what you did for non-key joins. Last time we
chatted there are discussions on the ordering issue, that we HAVE TO
augment the join result stream keys as a combo of both, which may not be
elegant as used in the DSL.

For your proposed solution, it seems you did not do that on the DSL but at
the PAPI layer, right?

Guozhang

On Tue, Feb 21, 2017 at 6:05 AM, Jan Filipiak 
wrote:


Just a little note here:

if you can take all rows of the "children" table for each key into memory,
you get get away by using group_by and make a list of them. With this
aggregation the join is straight forward and you can use a lateral view
later to get to the same result. For this you could use the current DSL to
a greater extend.

Best Jan

On 21.02.2017 13:10, Frank Lyaruu wrote:


I've read that JIRA (although I don't understand every single thing), and
I
got the feeling it is not exactly the same problem.
I am aware of the Global Tables, and I've tried that first, but I seem
unable to do what I need to do.

I'm replicating a relational database, and on a one-to-many relationship
I'd like to publish a joined message if either of the source streams
receives an update.

In the Global Table Wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+
Add+Global+Tables+to+Kafka+Streams

I see this:
"The GlobalKTable will only be used for doing lookups. That is, data
arriving in the GlobalKTable will not trigger the join. "

So how would I go about doing this?
regards, Frank



On Tue, Feb 21, 2017 at 10:38 AM, Eno Thereska 
wrote:

Hi Frank,

As far as I know the design in that wiki has been superceded by the
Global
KTables design which is now coming in 0.10.2. Hence, the JIRAs that are
mentioned there (like KAFKA-3705). There are some extensive comments in
https://issues.apache.org/jira/browse/KAFKA-3705 <
https://issues.apache.org/jira/browse/KAFKA-3705> illustrating why this
design is particularly challenging and why Global KTables was chosen
instead. I'm not sure if you still want to pursue that original design,
since it is not proven to work.

Guozhang, perhaps we need to add a note saying that Global KTables is the
new design?

Thanks
Eno

On 21 Feb 2017, at 07:35, Frank Lyaruu  wrote:

Hi all,

I'm trying to implement joining two Kafka tables using a 'remote' key,
basically as described here:

https://cwiki.apache.org/confluence/display/KAFKA/


Discussion%3A+Non-key+KTable-KTable+Joins


Under the "Implementation Details" there is one line I don't know how to
do:


1. First of all, we will repartition this KTable's stream, by key
computed from the *mapper(K, V) → K1*, so that it is co-partitioned
by
the same key. The co-partition topic is partitioned on the new key,


but the


message key and value are unchanged, and log compaction is turned
off.


How do I do that? I've been unable to find any documentation, I've
looked
at the StreamPartitionAssignor, that seems relevant, but I could use
some
help. Does anyone have an example?

regards, Frank









Re: Implementing a non-key in Kafka Streams using the Processor API

2017-02-21 Thread Jan Filipiak

Just a little note here:

if you can take all rows of the "children" table for each key into 
memory, you get get away by using group_by and make a list of them. With 
this aggregation the join is straight forward and you can use a lateral 
view later to get to the same result. For this you could use the current 
DSL to a greater extend.


Best Jan

On 21.02.2017 13:10, Frank Lyaruu wrote:

I've read that JIRA (although I don't understand every single thing), and I
got the feeling it is not exactly the same problem.
I am aware of the Global Tables, and I've tried that first, but I seem
unable to do what I need to do.

I'm replicating a relational database, and on a one-to-many relationship
I'd like to publish a joined message if either of the source streams
receives an update.

In the Global Table Wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams

I see this:
"The GlobalKTable will only be used for doing lookups. That is, data
arriving in the GlobalKTable will not trigger the join. "

So how would I go about doing this?
regards, Frank



On Tue, Feb 21, 2017 at 10:38 AM, Eno Thereska 
wrote:


Hi Frank,

As far as I know the design in that wiki has been superceded by the Global
KTables design which is now coming in 0.10.2. Hence, the JIRAs that are
mentioned there (like KAFKA-3705). There are some extensive comments in
https://issues.apache.org/jira/browse/KAFKA-3705 <
https://issues.apache.org/jira/browse/KAFKA-3705> illustrating why this
design is particularly challenging and why Global KTables was chosen
instead. I'm not sure if you still want to pursue that original design,
since it is not proven to work.

Guozhang, perhaps we need to add a note saying that Global KTables is the
new design?

Thanks
Eno


On 21 Feb 2017, at 07:35, Frank Lyaruu  wrote:

Hi all,

I'm trying to implement joining two Kafka tables using a 'remote' key,
basically as described here:

https://cwiki.apache.org/confluence/display/KAFKA/

Discussion%3A+Non-key+KTable-KTable+Joins

Under the "Implementation Details" there is one line I don't know how to
do:


   1. First of all, we will repartition this KTable's stream, by key
   computed from the *mapper(K, V) → K1*, so that it is co-partitioned by
   the same key. The co-partition topic is partitioned on the new key,

but the

   message key and value are unchanged, and log compaction is turned off.


How do I do that? I've been unable to find any documentation, I've looked
at the StreamPartitionAssignor, that seems relevant, but I could use some
help. Does anyone have an example?

regards, Frank






Re: Implementing a non-key in Kafka Streams using the Processor API

2017-02-21 Thread Jan Filipiak

Hi,

yes the ticket is exactly about what you want to do. The lengthy 
discussion is mainly about what the key of the output KTable is.


@gouzhang would you be interested in seeing what we did so far?

best Jan

On 21.02.2017 13:10, Frank Lyaruu wrote:

I've read that JIRA (although I don't understand every single thing), and I
got the feeling it is not exactly the same problem.
I am aware of the Global Tables, and I've tried that first, but I seem
unable to do what I need to do.

I'm replicating a relational database, and on a one-to-many relationship
I'd like to publish a joined message if either of the source streams
receives an update.

In the Global Table Wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams

I see this:
"The GlobalKTable will only be used for doing lookups. That is, data
arriving in the GlobalKTable will not trigger the join. "

So how would I go about doing this?
regards, Frank



On Tue, Feb 21, 2017 at 10:38 AM, Eno Thereska 
wrote:


Hi Frank,

As far as I know the design in that wiki has been superceded by the Global
KTables design which is now coming in 0.10.2. Hence, the JIRAs that are
mentioned there (like KAFKA-3705). There are some extensive comments in
https://issues.apache.org/jira/browse/KAFKA-3705 <
https://issues.apache.org/jira/browse/KAFKA-3705> illustrating why this
design is particularly challenging and why Global KTables was chosen
instead. I'm not sure if you still want to pursue that original design,
since it is not proven to work.

Guozhang, perhaps we need to add a note saying that Global KTables is the
new design?

Thanks
Eno


On 21 Feb 2017, at 07:35, Frank Lyaruu  wrote:

Hi all,

I'm trying to implement joining two Kafka tables using a 'remote' key,
basically as described here:

https://cwiki.apache.org/confluence/display/KAFKA/

Discussion%3A+Non-key+KTable-KTable+Joins

Under the "Implementation Details" there is one line I don't know how to
do:


   1. First of all, we will repartition this KTable's stream, by key
   computed from the *mapper(K, V) → K1*, so that it is co-partitioned by
   the same key. The co-partition topic is partitioned on the new key,

but the

   message key and value are unchanged, and log compaction is turned off.


How do I do that? I've been unable to find any documentation, I've looked
at the StreamPartitionAssignor, that seems relevant, but I could use some
help. Does anyone have an example?

regards, Frank






Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-08 Thread Jan Filipiak

Hi,

Just my few thoughts:

does it need to be json?
the old zkOffset tool had a nice format,
very easy to manipulate on cli
very powerfull: changes as many consumergroups/topics/partitions in one 
go as you want


maybe allow -1 and -2 to indicate earliest and latest reset regardless 
of what the group has as auto mechanism


I would definitely prefer a line oriented format rather than json. I 
ramped my https://stedolan.github.io/jq/ skills up

so I can do some partition assignments but its no joy, better grep awk ...

Best Jan

On 08.02.2017 03:43, Jorge Esteban Quilcate Otoya wrote:

Hi all,

I would like to propose a KIP to Add a tool to Reset Consumer Group Offsets.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+a+tool+to+Reset+Consumer+Group+Offsets

Please, take a look at the proposal and share your feedback.

Thanks,
Jorge.





Re: At Least Once semantics for Kafka Streams

2017-02-03 Thread Jan Filipiak

Hey,

with a little more effort you can try to make your stream application 
idempotent.
Maybe giving you the same results. Say you want to aggregate a KStream 
by some key.
Instead of keeping the aggregate, you keep a Set of raw values and then 
do the aggregate calculations

with a map().

This is very much more resource intensive as you have quite some O(n^2) 
statechange logs etc + the framework
is not really friendly helping you with that. With caching you can 
hopefully spare quite a good chunk of the quadratic properties but it 
can act as an exactly once processing.



Best Jan

On 30.01.2017 05:13, Mahendra Kariya wrote:

Hey All,

I am new to Kafka streams. From the documentation
,
it is pretty much clear that streams support at least once semantics. But I
couldn't find details about how this is supported. I am interested in
knowing the finer details / design of this.

Is there some documentation around this?
Is there some documentation around what semantics are followed by the
various Kafka streams examples

available on Github? Do all of them follow at least once?


Thanks,
Mahendra





Re: kafka-consumer-offset-checker complaining about NoNode for X in zk

2017-02-02 Thread Jan Filipiak

Hi,

sorry and using the consumer group tool, instead of the offset checker


On 02.02.2017 20:08, Jan Filipiak wrote:

Hi,

 if its a kafka stream app, its most likely going to store its offsets 
in kafka rather than zookeeper.

You can use the --new-consumer option to check for kafka stored offsets.

Best Jan


On 01.02.2017 21:14, Ara Ebrahimi wrote:

Hi,

For a subset of our topics we get this error:

$KAFKA_HOME/bin/kafka-consumer-offset-checker.sh --group 
argyle-streams --topic topic_name --zookeeper $ZOOKEEPERS
[2017-02-01 12:08:56,115] WARN WARNING: ConsumerOffsetChecker is 
deprecated and will be dropped in releases following 0.9.0. Use 
ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: 
KeeperErrorCode = NoNode for 
/consumers/streams-app/offsets/topic_name/2.


All topics are created via the same process but a few of them report 
this error.


Why I check in zk there’s noting under /consumers:

zookeeper-client -server $ZOOKEEPERS -cmd ls /consumers

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[]

But the kafka-consumer-offset-checker does work for many topics 
anyway, and it fails for a few.


Why does this happen? How can I fix it?

Thanks,
Ara.





This message is for the designated recipient only and may contain 
privileged, proprietary, or otherwise confidential information. If 
you have received it in error, please notify the sender immediately 
and delete the original. Any other use of the e-mail by you is 
prohibited. Thank you in advance for your cooperation.









Re: kafka-consumer-offset-checker complaining about NoNode for X in zk

2017-02-02 Thread Jan Filipiak

Hi,

 if its a kafka stream app, its most likely going to store its offsets 
in kafka rather than zookeeper.

You can use the --new-consumer option to check for kafka stored offsets.

Best Jan


On 01.02.2017 21:14, Ara Ebrahimi wrote:

Hi,

For a subset of our topics we get this error:

$KAFKA_HOME/bin/kafka-consumer-offset-checker.sh --group argyle-streams --topic 
topic_name --zookeeper $ZOOKEEPERS
[2017-02-01 12:08:56,115] WARN WARNING: ConsumerOffsetChecker is deprecated and 
will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. 
(kafka.tools.ConsumerOffsetChecker$)
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: 
KeeperErrorCode = NoNode for /consumers/streams-app/offsets/topic_name/2.

All topics are created via the same process but a few of them report this error.

Why I check in zk there’s noting under /consumers:

zookeeper-client -server $ZOOKEEPERS -cmd ls /consumers

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[]

But the kafka-consumer-offset-checker does work for many topics anyway, and it 
fails for a few.

Why does this happen? How can I fix it?

Thanks,
Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.






Re: Does KafkaStreams be configured to use multiple broker clusters in the same topology

2017-02-02 Thread Jan Filipiak

Sometimes I wake up cause I dreamed that this had gone down:

https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics




On 02.02.2017 19:07, Roger Vandusen wrote:

Ah, yes, I see your point and use case, thanks for the feedback.

On 2/2/17, 11:02 AM, "Damian Guy"  wrote:

 Hi Roger,
 
 The problem is that you can't do it ansyc and still guarantee at-least-once

 delivery. For example:
 if your streams app looked something like this:
 
 builder.stream("input").mapValue(...).process(yourCustomerProcessSupplier);
 
 On the commit interval, kafka streams will commit the consumed offsets for

 the topic "input". Now if you do an async call in process, there is no
 guarantee that the message has been delivered. The broker might fail, there
 may be some other transient error. So you can end up dropping messages as
 the consumer has committed the offset of the source topic, but the receiver
 has not actually received it.
 
 Does that make sense?
 
 Thanks,

 Damian
 
 On Thu, 2 Feb 2017 at 17:56 Roger Vandusen 

 wrote:
 
 > Damian,

 >
 > We could lessen the producer.send(..).get() impact on throughput by 
simply
 > handing it off to another async worker component in our springboot app, 
any
 > feedback on that?
 >
 > -Roger
 >
 > On 2/2/17, 10:35 AM, "Damian Guy"  wrote:
 >
 > Hi, yes you could attach a custom processor that writes to another
 > Kafka
 > cluster. The problem is going to be guaranteeing at least once 
delivery
 > without impacting throughput. To guarantee at least once you would
 > need to
 > do a blocking send on every call to process, i.e.,
 > producer.send(..).get(),
 > this is going to have an impact on throughput, but i can't currently
 > think
 > of another way of doing it (with the current framework) that will
 > guarantee
 > at-least-once delivery.
 >
 > On Thu, 2 Feb 2017 at 17:26 Roger Vandusen <
 > roger.vandu...@ticketmaster.com>
 > wrote:
 >
 > > Thanks for the quick reply Damian.
 > >
 > > So the work-around would be to configure our source topology’s 
with a
 > > processor component that would use another app component as a
 > stand-alone
 > > KafkaProducer, let’s say an injected spring bean, configured to the
 > other
 > > (sink) cluster, and then publish sink topic messages through this
 > producer
 > > to the sink cluster?
 > >
 > > Sound like a solution? Have a better suggestion or any warnings
 > about this
 > > approach?
 > >
 > > -Roger
 > >
 > >
 > > On 2/2/17, 10:10 AM, "Damian Guy"  wrote:
 > >
 > > Hi Roger,
 > >
 > > This is not currently supported and won't be available in
 > 0.10.2.0.
 > > This has been discussed, but it doesn't look there is a JIRA 
for
 > it
 > > yet.
 > >
 > > Thanks,
 > > Damian
 > >
 > > On Thu, 2 Feb 2017 at 16:51 Roger Vandusen <
 > > roger.vandu...@ticketmaster.com>
 > > wrote:
 > >
 > > > We would like to source topics from one cluster and sink them
 > to a
 > > > different cluster from the same topology.
 > > >
 > > > If this is not currently supported then is there a KIP/JIRA 
to
 > track
 > > work
 > > > to support this in the future? 0.10.2.0?
 > > >
 > > > -Roger
 > > >
 > > >
 > >
 > >
 > >
 >
 >
 >
 





Re: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-30 Thread Jan Filipiak

Hi Eno,

thanks for putting into different points. I want to put a few remarks 
inline.


Best Jan

On 30.01.2017 12:19, Eno Thereska wrote:

So I think there are several important discussion threads that are emerging 
here. Let me try to tease them apart:

1. inconsistency in what is materialized and what is not, what is queryable and 
what is not. I think we all agree there is some inconsistency there and this 
will be addressed with any of the proposed approaches. Addressing the 
inconsistency is the point of the original KIP.

2. the exact API for materializing a KTable. We can specify 1) a "store name" (as we do today) or 
2) have a ".materialize[d]" call or 3) get a handle from a KTable ".getQueryHandle" or 4) 
have a builder construct. So we have discussed 4 options. It is important to remember in this discussion that 
IQ is not designed for just local queries, but also for distributed queries. In all cases an identifying 
name/id is needed for the store that the user is interested in querying. So we end up with a discussion on 
who provides the name, the user (as done today) or if it is generated automatically (as Jan suggests, as I 
understand it). If it is generated automatically we need a way to expose these auto-generated names to the 
users and link them to the KTables they care to query.
Hi, the last sentence is what I currently arguing against. The user 
would never see a stringtype indentifier name or anything. All he gets 
is the queryHandle if he executes a get(K) that will be an interactive 
query get. with all the finding the right servers that currently have a 
copy of this underlying store stuff going on. The nice part is that if 
someone retrieves a queryHandle, you know that you have to materialized 
(if you are not already) as queries will be coming. Taking away the 
confusion mentioned in point 1 IMO.


3. The exact boundary between the DSL, that is the processing language, and the 
storage/IQ queries, and how we jump from one to the other. This is mostly for 
how we get a handle on a store (so it's related to point 2), rather than for 
how we query the store. I think we all agree that we don't want to limit ways 
one can query a store (e.g., using gets or range queries etc) and the query 
APIs are not in the scope of the DSL.
Does the IQ work with range currently? The range would have to be 
started on all stores and then merged by maybe the client. Range force a 
flush to RocksDB currently so I am sure you would get a performance hit 
right there. Time-windows might be okay, but I am not sure if the first 
version should offer the user range access.


4. The nature of the DSL and whether its declarative enough, or flexible 
enough. Damian made the point that he likes the builder pattern since users can 
specify, per KTable, things like caching and logging needs. His observation (as 
I understand it) is that the processor API (PAPI) is flexible but doesn't 
provide any help at all to users. The current DSL provides declarative 
abstractions, but it's not fine-grained enough. This point is much broader than 
the KIP, but discussing it in this KIPs context is ok, since we don't want to 
make small piecemeal changes and then realise we're not in the spot we want to 
be.
This is indeed much broader. My guess here is that's why both API's 
exists and helping the users to switch back and forth might be a thing.


Feel free to pitch in if I have misinterpreted something.

Thanks
Eno



On 30 Jan 2017, at 10:22, Jan Filipiak  wrote:

Hi Eno,

I have a really hard time understanding why we can't. From my point of view 
everything could be super elegant DSL only + public api for the PAPI-people as 
already exist.

The above aproach implementing a .get(K) on KTable is foolisch in my opinion as 
it would be to late to know that materialisation would be required.
But having an API that allows to indicate I want to query this table and then 
wrapping the say table's processorname can work out really really nice. The 
only obstacle I see is people not willing to spend the additional time in 
implementation and just want a quick shot option to make it work.

For me it would look like this:

table =  builder.table()
filteredTable = table.filter()
rawHandle = table.getQueryHandle() // Does the materialisation, really all 
names possible but id rather hide the implication of it materializes
filteredTableHandle = filteredTable.getQueryHandle() // this would _not_ 
materialize again of course, the source or the aggregator would stay the only 
materialized processors
streams = new streams(builder)

This middle part is highly flexible I could imagin to force the user todo 
something like this. This implies to the user that his streams need to be 
running
instead of propagating the missing initialisation back by exceptions. Also if 
the users is forced to pass the appropriate streams instance back can change.
I think its

Re: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-30 Thread Jan Filipiak
ndowing triggers as well. For serdes
specifically, we had a very long discussion about it and concluded that, at
least in Java7, we cannot completely abstract serde away in the DSL, so we
choose the other extreme to enforce users to be completely aware of the
serde requirements when some KTables may need to be materialized vis
overloaded API functions. While for the state store names, I feel it is a
different argument than serdes (details below).


So to me, for either materialize() v.s. overloaded functions directions,
the first thing I'd like to resolve is the inconsistency issue mentioned
above. So in either case: KTable materialization will not be affect by user
providing state store name or not, but will only be decided by the library
when it is necessary. More specifically, only join operator and
builder.table() resulted KTables are not always materialized, but are still
likely to be materialized lazily (e.g. when participated in a join
operator).


For overloaded functions that would mean:

a) we have an overloaded function for ALL operators that could result
in a KTable, and allow it to be null (i.e. for the function without this
param it is null by default);
b) null-state-store-name do not indicate that a KTable would not be
materialized, but that it will not be used for IQ at all (internal state
store names will be generated when necessary).


For materialize() that would mean:

a) we will remove state store names from ALL operators that could
result in a KTable.
b) KTables that not calling materialized do not indicate that a KTable
would not be materialized, but that it will not be used for IQ at all
(internal state store names will be generated when necessary).


Again, in either ways the API itself does not "hint" about anything for
materializing a KTable or not at all; it is still purely determined by the
library when parsing the DSL for now.

Following these thoughts, I feel that 1) we should probably change the name
"materialize" since it may be misleading to users as what actually happened
behind the scene, to e.g. Damian suggested "queryableStore(String storeName)",
which returns a QueryableStateStore, and can replace the
`KafkaStreams.store` function; 2) comparing those two options assuming we
get rid of the misleading function name, I personally favor not adding more
overloading functions as it keeps the API simpler.



Guozhang


On Sat, Jan 28, 2017 at 2:32 PM, Jan Filipiak 
wrote:


Hi,

thanks for your mail, felt like this can clarify some things! The thread
unfortunately split but as all branches close in on what my suggestion was
about Ill pick this to continue

Of course only the table the user wants to query would be materialized.
(retrieving the queryhandle implies materialisation). So In the example of
KTable::filter if you call
getIQHandle on both tables only the one source that is there would
materialize and the QueryHandleabstraction would make sure it gets mapped
and filtered and what not uppon read as usual.

Of Course the Object you would retrieve would maybe only wrap the
storeName / table unique identifier and a way to access the streams
instance and then basically uses the same mechanism that is currently used.
 From my point of view this is the least confusing way for DSL users. If
its to tricky to get a hand on the streams instance one could ask the user
to pass it in before executing queries, therefore making sure the streams
instance has been build.

The effort to implement this is indeed some orders of magnitude higher
than the overloaded materialized call. As long as I could help getting a
different view I am happy.

Best Jan


On 28.01.2017 09:36, Eno Thereska wrote:


Hi Jan,

I understand your concern. One implication of not passing any store name
and just getting an IQ handle is that all KTables would need to be
materialised. Currently the store name (or proposed .materialize() call)
act as hints on whether to materialise the KTable or not. Materialising
every KTable can be expensive, although there are some tricks one can play,
e.g., have a virtual store rather than one backed by a Kafka topic.

However, even with the above, after getting an IQ handle, the user would
still need to use IQ APIs to query the state. As such, we would still
continue to be outside the original DSL so this wouldn't address your
original concern.

So I read this suggestion as simplifying the APIs by removing the store
name, at the cost of having to materialise every KTable. It's definitely an
option we'll consider as part of this KIP.

Thanks
Eno


On 28 Jan 2017, at 06:49, Jan Filipiak  wrote:

Hi Exactly

I know it works from the Processor API, but my suggestion would prevent
DSL users dealing with storenames what so ever.

In general I am pro switching between DSL and Processor API easily. (In
my Stream applications I do this a lot with reflection and instanciating
KTableImpl) Concerning this KIP all I say is that 

Re: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-28 Thread Jan Filipiak

Hi,

thanks for your mail, felt like this can clarify some things! The thread 
unfortunately split but as all branches close in on what my suggestion 
was about Ill pick this to continue


Of course only the table the user wants to query would be materialized. 
(retrieving the queryhandle implies materialisation). So In the example 
of KTable::filter if you call
getIQHandle on both tables only the one source that is there would 
materialize and the QueryHandleabstraction would make sure it gets 
mapped and filtered and what not uppon read as usual.


Of Course the Object you would retrieve would maybe only wrap the 
storeName / table unique identifier and a way to access the streams 
instance and then basically uses the same mechanism that is currently used.
From my point of view this is the least confusing way for DSL users. If 
its to tricky to get a hand on the streams instance one could ask the 
user to pass it in before executing queries, therefore making sure the 
streams

instance has been build.

The effort to implement this is indeed some orders of magnitude higher 
than the overloaded materialized call. As long as I could help getting a 
different view I am happy.


Best Jan

On 28.01.2017 09:36, Eno Thereska wrote:

Hi Jan,

I understand your concern. One implication of not passing any store name and 
just getting an IQ handle is that all KTables would need to be materialised. 
Currently the store name (or proposed .materialize() call) act as hints on 
whether to materialise the KTable or not. Materialising every KTable can be 
expensive, although there are some tricks one can play, e.g., have a virtual 
store rather than one backed by a Kafka topic.

However, even with the above, after getting an IQ handle, the user would still 
need to use IQ APIs to query the state. As such, we would still continue to be 
outside the original DSL so this wouldn't address your original concern.

So I read this suggestion as simplifying the APIs by removing the store name, 
at the cost of having to materialise every KTable. It's definitely an option 
we'll consider as part of this KIP.

Thanks
Eno



On 28 Jan 2017, at 06:49, Jan Filipiak  wrote:

Hi Exactly

I know it works from the Processor API, but my suggestion would prevent DSL 
users dealing with storenames what so ever.

In general I am pro switching between DSL and Processor API easily. (In my Stream applications 
I do this a lot with reflection and instanciating KTableImpl) Concerning this KIP all I say is 
that there should be a DSL concept of "I want to expose this __KTable__. This can be a 
Method like KTable::retrieveIQHandle():InteractiveQueryHandle, the table would know to 
materialize, and the user had a reference to the "store and the distributed query 
mechanism by the Interactive Query Handle" under the hood it can use the same mechanism 
as the PIP people again.

I hope you see my point J

Best Jan


#DeathToIQMoreAndBetterConnectors :)




On 27.01.2017 21:59, Matthias J. Sax wrote:

Jan,

the IQ feature is not limited to Streams DSL but can also be used for
Stores used in PAPI. Thus, we need a mechanism that does work for PAPI
and DSL.

Nevertheless I see your point and I think we could provide a better API
for KTable stores including the discovery of remote shards of the same
KTable.

@Michael: Yes, right now we do have a lot of overloads and I am not a
big fan of those -- I would rather prefer a builder pattern. But that
might be a different discussion (nevertheless, if we would aim for a API
rework, we should get the changes with regard to stores right from the
beginning on, in order to avoid a redesign later on.)

something like:

stream.groupyByKey()
   .window(TimeWindow.of(5000))
   .aggregate(...)
   .withAggValueSerde(new CustomTypeSerde())
   .withStoreName("storeName);


(This would also reduce JavaDoc redundancy -- maybe a personal pain
point right now :))


-Matthias

On 1/27/17 11:10 AM, Jan Filipiak wrote:

Yeah,

Maybe my bad that I refuse to look into IQ as i don't find them anywhere
close to being interesting. The Problem IMO is that people need to know
the Store name), so we are working on different levels to achieve a
single goal.

What is your peoples opinion on having a method on KTABLE that returns
them something like a Keyvalue store. There is of course problems like
"it cant be used before the streamthreads are going and groupmembership
is established..." but the benefit would be that for the user there is a
consistent way of saying "Hey I need it materialized as querries gonna
be comming" + already get a Thing that he can execute the querries on in
1 step.
What I think is unintuitive here is you need to say materialize on this
Ktable and then you go somewhere else and find its store name and then
you go to the kafkastreams instance and ask for the store with this name.

So one could the user help to stay

Re: Fwd: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-27 Thread Jan Filipiak

Hi Gwen,

this is not a hint as in "make it smarter" this is a hint as to "make it 
work" wich should not require hinting.


Best Jan




On 27.01.2017 22:35, Gwen Shapira wrote:

Another vote in favor of overloading. I think the streams API actually
trains users quite well in realizing the implications of adding a
state-store - we need to figure out the correct Serde every single
time :)

Another option: "materialize" behaves almost as a SQL hint - i.e.
allows a user to control an implementation detail while working inside
a DSL that usually hides them. We should consider that this may not be
the last hint we'll need ("cache results", "predicate pushdown", hash
join vs merge join, etc), but in most cases, we won't be able to infer
a hint from the existence of an argument like state-store name.
Mathias suggestion to make .materialize() a top level method is
awkward precisely because it doesn't fit into the DSL model very well,
but if we have a generalized way to "hint" at operations, this could
be a good fit.

On Fri, Jan 27, 2017 at 7:49 AM, Michael Noll  wrote:

Like Damian, and for the same reasons, I am more in favor of overloading
methods rather than introducing `materialize()`.
FWIW, we already have a similar API setup for e.g.
`KTable#through(topicName, stateStoreName)`.

A related but slightly different question is what e.g. Jan Filipiak
mentioned earlier in this thread:
I think we need to explain more clearly why KIP-114 doesn't propose the
seemingly simpler solution of always materializing tables/state stores.



On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak 
wrote:


Hi,

Yeah its confusing, Why shoudn't it be querable by IQ? If you uses the
ValueGetter of Filter it will apply the filter and should be completely
transparent as to if another processor or IQ is accessing it? How can this
new method help?

I cannot see the reason for the additional materialize method being
required! Hence I suggest leave it alone.
regarding removing the others I dont have strong opinions and it seems to
be unrelated.

Best Jan




On 26.01.2017 20:48, Eno Thereska wrote:


Forwarding this thread to the users list too in case people would like to
comment. It is also on the dev list.

Thanks
Eno

Begin forwarded message:

From: "Matthias J. Sax" 
Subject: Re: [DISCUSS] KIP-114: KTable materialization and improved
semantics
Date: 24 January 2017 at 19:30:10 GMT
To: d...@kafka.apache.org
Reply-To: d...@kafka.apache.org

That not what I meant by "huge impact".

I refer to the actions related to materialize a KTable: creating a
RocksDB store and a changelog topic -- users should be aware about
runtime implication and this is better expressed by an explicit method
call, rather than implicitly triggered by using a different overload of
a method.


-Matthias

On 1/24/17 1:35 AM, Damian Guy wrote:


I think your definition of a huge impact and mine are rather different
;-P
Overloading a few methods  is not really a huge impact IMO. It is also a
sacrifice worth making for readability, usability of the API.

On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax 
wrote:

I understand your argument, but do not agree with it.

Your first version (even if the "flow" is not as nice) is more explicit
than the second version. Adding a stateStoreName parameter is quite
implicit but has a huge impact -- thus, I prefer the rather more
verbose
but explicit version.


-Matthias

On 1/23/17 1:39 AM, Damian Guy wrote:


I'm not a fan of materialize. I think it interrupts the flow, i.e,

table.mapValue(..).materialize().join(..).materialize()
compared to:
table.mapValues(..).join(..)

I know which one i prefer.
My preference is stil to provide overloaded methods where people can
specify the store names if they want, otherwise we just generate them.

On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax 


wrote:


Hi,

thanks for the KIP Eno! Here are my 2 cents:

1) I like Guozhang's proposal about removing store name from all
KTable
methods and generate internal names (however, I would do this as
overloads). Furthermore, I would not force users to call
.materialize()
if they want to query a store, but add one more method
.stateStoreName()
that returns the store name if the KTable is materialized. Thus, also
.materialize() must not necessarily have a parameter storeName (ie,
we
should have some overloads here).

I would also not allow to provide a null store name (to indicate no
materialization if not necessary) but throw an exception.

This yields some simplification (see below).


2) I also like Guozhang's proposal about KStream#toTable()


3)


   3. What will happen when you call materialize on KTable that is
already
   materialized? Will it create another StateStore (providing the

name


is

   different), throw an Exception?

Currently an exception is thrown, but see below.


If we follow approach (1) from Guozhang, there

Re: Fwd: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-27 Thread Jan Filipiak

Hi Exactly

I know it works from the Processor API, but my suggestion would prevent 
DSL users dealing with storenames what so ever.


In general I am pro switching between DSL and Processor API easily. (In 
my Stream applications I do this a lot with reflection and instanciating 
KTableImpl) Concerning this KIP all I say is that there should be a DSL 
concept of "I want to expose this __KTable__. This can be a Method like 
KTable::retrieveIQHandle():InteractiveQueryHandle, the table would know 
to materialize, and the user had a reference to the "store and the 
distributed query mechanism by the Interactive Query Handle" under the 
hood it can use the same mechanism as the PIP people again.


I hope you see my point J

Best Jan


#DeathToIQMoreAndBetterConnectors :)




On 27.01.2017 21:59, Matthias J. Sax wrote:

Jan,

the IQ feature is not limited to Streams DSL but can also be used for
Stores used in PAPI. Thus, we need a mechanism that does work for PAPI
and DSL.

Nevertheless I see your point and I think we could provide a better API
for KTable stores including the discovery of remote shards of the same
KTable.

@Michael: Yes, right now we do have a lot of overloads and I am not a
big fan of those -- I would rather prefer a builder pattern. But that
might be a different discussion (nevertheless, if we would aim for a API
rework, we should get the changes with regard to stores right from the
beginning on, in order to avoid a redesign later on.)

something like:

stream.groupyByKey()
   .window(TimeWindow.of(5000))
   .aggregate(...)
   .withAggValueSerde(new CustomTypeSerde())
   .withStoreName("storeName);


(This would also reduce JavaDoc redundancy -- maybe a personal pain
point right now :))


-Matthias

On 1/27/17 11:10 AM, Jan Filipiak wrote:

Yeah,

Maybe my bad that I refuse to look into IQ as i don't find them anywhere
close to being interesting. The Problem IMO is that people need to know
the Store name), so we are working on different levels to achieve a
single goal.

What is your peoples opinion on having a method on KTABLE that returns
them something like a Keyvalue store. There is of course problems like
"it cant be used before the streamthreads are going and groupmembership
is established..." but the benefit would be that for the user there is a
consistent way of saying "Hey I need it materialized as querries gonna
be comming" + already get a Thing that he can execute the querries on in
1 step.
What I think is unintuitive here is you need to say materialize on this
Ktable and then you go somewhere else and find its store name and then
you go to the kafkastreams instance and ask for the store with this name.

So one could the user help to stay in DSL land and therefore maybe
confuse him less.

Best Jan

#DeathToIQMoreAndBetterConnectors :)



On 27.01.2017 16:51, Damian Guy wrote:

I think Jan is saying that they don't always need to be materialized,
i.e.,
filter just needs to apply the ValueGetter, it doesn't need yet another
physical state store.

On Fri, 27 Jan 2017 at 15:49 Michael Noll  wrote:


Like Damian, and for the same reasons, I am more in favor of overloading
methods rather than introducing `materialize()`.
FWIW, we already have a similar API setup for e.g.
`KTable#through(topicName, stateStoreName)`.

A related but slightly different question is what e.g. Jan Filipiak
mentioned earlier in this thread:
I think we need to explain more clearly why KIP-114 doesn't propose the
seemingly simpler solution of always materializing tables/state stores.



On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak 
wrote:


Hi,

Yeah its confusing, Why shoudn't it be querable by IQ? If you uses the
ValueGetter of Filter it will apply the filter and should be completely
transparent as to if another processor or IQ is accessing it? How can

this

new method help?

I cannot see the reason for the additional materialize method being
required! Hence I suggest leave it alone.
regarding removing the others I dont have strong opinions and it
seems to
be unrelated.

Best Jan




On 26.01.2017 20:48, Eno Thereska wrote:


Forwarding this thread to the users list too in case people would like

to

comment. It is also on the dev list.

Thanks
Eno

Begin forwarded message:

From: "Matthias J. Sax" 
Subject: Re: [DISCUSS] KIP-114: KTable materialization and improved
semantics
Date: 24 January 2017 at 19:30:10 GMT
To: d...@kafka.apache.org
Reply-To: d...@kafka.apache.org

That not what I meant by "huge impact".

I refer to the actions related to materialize a KTable: creating a
RocksDB store and a changelog topic -- users should be aware about
runtime implication and this is better expressed by an explicit
method
call, rather than implicitly triggered by using a different
overload of
a method.


-Matthias

On 1/24/17 1:35 AM, Damian Guy wrote:


I think your definition of a huge impact and mine are rather
di

Re: Fwd: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-27 Thread Jan Filipiak

Yeah,

Maybe my bad that I refuse to look into IQ as i don't find them anywhere 
close to being interesting. The Problem IMO is that people need to know 
the Store name), so we are working on different levels to achieve a 
single goal.


What is your peoples opinion on having a method on KTABLE that returns 
them something like a Keyvalue store. There is of course problems like 
"it cant be used before the streamthreads are going and groupmembership 
is established..." but the benefit would be that for the user there is a 
consistent way of saying "Hey I need it materialized as querries gonna 
be comming" + already get a Thing that he can execute the querries on in 
1 step.
What I think is unintuitive here is you need to say materialize on this 
Ktable and then you go somewhere else and find its store name and then 
you go to the kafkastreams instance and ask for the store with this name.


So one could the user help to stay in DSL land and therefore maybe 
confuse him less.


Best Jan

#DeathToIQMoreAndBetterConnectors :)



On 27.01.2017 16:51, Damian Guy wrote:

I think Jan is saying that they don't always need to be materialized, i.e.,
filter just needs to apply the ValueGetter, it doesn't need yet another
physical state store.

On Fri, 27 Jan 2017 at 15:49 Michael Noll  wrote:


Like Damian, and for the same reasons, I am more in favor of overloading
methods rather than introducing `materialize()`.
FWIW, we already have a similar API setup for e.g.
`KTable#through(topicName, stateStoreName)`.

A related but slightly different question is what e.g. Jan Filipiak
mentioned earlier in this thread:
I think we need to explain more clearly why KIP-114 doesn't propose the
seemingly simpler solution of always materializing tables/state stores.



On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak 
wrote:


Hi,

Yeah its confusing, Why shoudn't it be querable by IQ? If you uses the
ValueGetter of Filter it will apply the filter and should be completely
transparent as to if another processor or IQ is accessing it? How can

this

new method help?

I cannot see the reason for the additional materialize method being
required! Hence I suggest leave it alone.
regarding removing the others I dont have strong opinions and it seems to
be unrelated.

Best Jan




On 26.01.2017 20:48, Eno Thereska wrote:


Forwarding this thread to the users list too in case people would like

to

comment. It is also on the dev list.

Thanks
Eno

Begin forwarded message:

From: "Matthias J. Sax" 
Subject: Re: [DISCUSS] KIP-114: KTable materialization and improved
semantics
Date: 24 January 2017 at 19:30:10 GMT
To: d...@kafka.apache.org
Reply-To: d...@kafka.apache.org

That not what I meant by "huge impact".

I refer to the actions related to materialize a KTable: creating a
RocksDB store and a changelog topic -- users should be aware about
runtime implication and this is better expressed by an explicit method
call, rather than implicitly triggered by using a different overload of
a method.


-Matthias

On 1/24/17 1:35 AM, Damian Guy wrote:


I think your definition of a huge impact and mine are rather different
;-P
Overloading a few methods  is not really a huge impact IMO. It is

also a

sacrifice worth making for readability, usability of the API.

On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax 
wrote:

I understand your argument, but do not agree with it.

Your first version (even if the "flow" is not as nice) is more

explicit

than the second version. Adding a stateStoreName parameter is quite
implicit but has a huge impact -- thus, I prefer the rather more
verbose
but explicit version.


-Matthias

On 1/23/17 1:39 AM, Damian Guy wrote:


I'm not a fan of materialize. I think it interrupts the flow, i.e,

table.mapValue(..).materialize().join(..).materialize()
compared to:
table.mapValues(..).join(..)

I know which one i prefer.
My preference is stil to provide overloaded methods where people can
specify the store names if they want, otherwise we just generate

them.

On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax 
wrote:


Hi,

thanks for the KIP Eno! Here are my 2 cents:

1) I like Guozhang's proposal about removing store name from all
KTable
methods and generate internal names (however, I would do this as
overloads). Furthermore, I would not force users to call
.materialize()
if they want to query a store, but add one more method
.stateStoreName()
that returns the store name if the KTable is materialized. Thus,

also

.materialize() must not necessarily have a parameter storeName (ie,
we
should have some overloads here).

I would also not allow to provide a null store name (to indicate no
materialization if not necessary) but throw an exception.

This yields some simplification (see below).


2) I also like Guozhang's proposal about KStream#toTable()


3)


   3. What will happen when you call materialize on KTable that is
already
   material

Re: Fwd: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-27 Thread Jan Filipiak

Hi,

Yeah its confusing, Why shoudn't it be querable by IQ? If you uses the 
ValueGetter of Filter it will apply the filter and should be completely 
transparent as to if another processor or IQ is accessing it? How can 
this new method help?


I cannot see the reason for the additional materialize method being 
required! Hence I suggest leave it alone.
regarding removing the others I dont have strong opinions and it seems 
to be unrelated.


Best Jan



On 26.01.2017 20:48, Eno Thereska wrote:

Forwarding this thread to the users list too in case people would like to 
comment. It is also on the dev list.

Thanks
Eno


Begin forwarded message:

From: "Matthias J. Sax" 
Subject: Re: [DISCUSS] KIP-114: KTable materialization and improved semantics
Date: 24 January 2017 at 19:30:10 GMT
To: d...@kafka.apache.org
Reply-To: d...@kafka.apache.org

That not what I meant by "huge impact".

I refer to the actions related to materialize a KTable: creating a
RocksDB store and a changelog topic -- users should be aware about
runtime implication and this is better expressed by an explicit method
call, rather than implicitly triggered by using a different overload of
a method.


-Matthias

On 1/24/17 1:35 AM, Damian Guy wrote:

I think your definition of a huge impact and mine are rather different ;-P
Overloading a few methods  is not really a huge impact IMO. It is also a
sacrifice worth making for readability, usability of the API.

On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax  wrote:


I understand your argument, but do not agree with it.

Your first version (even if the "flow" is not as nice) is more explicit
than the second version. Adding a stateStoreName parameter is quite
implicit but has a huge impact -- thus, I prefer the rather more verbose
but explicit version.


-Matthias

On 1/23/17 1:39 AM, Damian Guy wrote:

I'm not a fan of materialize. I think it interrupts the flow, i.e,

table.mapValue(..).materialize().join(..).materialize()
compared to:
table.mapValues(..).join(..)

I know which one i prefer.
My preference is stil to provide overloaded methods where people can
specify the store names if they want, otherwise we just generate them.

On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax 

wrote:

Hi,

thanks for the KIP Eno! Here are my 2 cents:

1) I like Guozhang's proposal about removing store name from all KTable
methods and generate internal names (however, I would do this as
overloads). Furthermore, I would not force users to call .materialize()
if they want to query a store, but add one more method .stateStoreName()
that returns the store name if the KTable is materialized. Thus, also
.materialize() must not necessarily have a parameter storeName (ie, we
should have some overloads here).

I would also not allow to provide a null store name (to indicate no
materialization if not necessary) but throw an exception.

This yields some simplification (see below).


2) I also like Guozhang's proposal about KStream#toTable()


3)

  3. What will happen when you call materialize on KTable that is

already

  materialized? Will it create another StateStore (providing the name

is

  different), throw an Exception?

Currently an exception is thrown, but see below.



If we follow approach (1) from Guozhang, there is no need to worry about
a second materialization and also no exception must be throws. A call to
.materialize() basically sets a "materialized flag" (ie, idempotent
operation) and sets a new name.


4)

Rename toStream() to toKStream() for consistency.

Not sure whether that is really required. We also use
`KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example,

and

don't care about the "K" prefix.

Eno's reply:

I think changing it to `toKStream` would make it absolutely clear what

we are converting it to.

I'd say we should probably change the KStreamBuilder methods (but not

in

this KIP).

I would keep #toStream(). (see below)


5) We should not remove any methods but only deprecate them.



A general note:

I do not understand your comments "Rejected Alternatives". You say "Have
the KTable be the materialized view" was rejected. But your KIP actually
does exactly this -- the changelog abstraction of KTable is secondary
after those changes and the "view" abstraction is what a KTable is. And
just to be clear, I like this a lot:

- it aligns with the name KTable
- is aligns with stream-table-duality
- it aligns with IQ

I would say that a KTable is a "view abstraction" (as materialization is
optional).



-Matthias




On 1/22/17 5:05 PM, Guozhang Wang wrote:

Thanks for the KIP Eno, I have a few meta comments and a few detailed
comments:

1. I like the materialize() function in general, but I would like to

see

how other KTable functions should be updated accordingly. For example,

1)

KStreamBuilder.table(..) has a state store name parameter, and we will
always materialize the KTable unless its state store name is set to

null;

2) KTable.agg requires the result KTable to be material

Re: Kafka Logo as HighRes or Vectorgraphics

2016-12-02 Thread Jan Filipiak

Hi,

I was just pointed to this. https://www.vectorlogo.zone/logos/apache_kafka/
if someone else is looking for the same thing! thanks a lot

Best Jan

On 01.12.2016 13:05, Jan Filipiak wrote:

Hi Everyone,

we want to print some big banners of the Kafka logo to decorate our 
offices. Can anyone help me find a version

of the kafka logo that would still look nice printed onto 2x4m flags?
Highly appreciated!

Best Jan




Kafka Logo as HighRes or Vectorgraphics

2016-12-01 Thread Jan Filipiak

Hi Everyone,

we want to print some big banners of the Kafka logo to decorate our 
offices. Can anyone help me find a version

of the kafka logo that would still look nice printed onto 2x4m flags?
Highly appreciated!

Best Jan


Re: [VOTE] Add REST Server to Apache Kafka

2016-10-26 Thread Jan Filipiak
And you also still need to find the correct broker, for each http call, 
wich is also hard, when programming against the http api


On 26.10.2016 09:46, Jan Filipiak wrote:

So happy to see this reply.
I do think the same, actually makes it way harder to properly batch up 
records on http, as kafka core would need to know how to split your 
payload.

It would help people do the wrong thing IMO

best Jan

On 25.10.2016 23:58, Jay Kreps wrote:

-1

I think the REST server for Kafka that already exists is quite good and
getting contributions. Moving this into the core project doesn't solve a
problem that I see.

-Jay

On Tue, Oct 25, 2016 at 2:16 PM, Harsha Chintalapani 
wrote:


Hi All,
We are proposing to have a REST Server as part of Apache 
Kafka

to provide producer/consumer/admin APIs. We Strongly believe having
REST server functionality with Apache Kafka will help a lot of users.
Here is the KIP that Mani Kumar wrote
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
80:+Kafka+Rest+Server.
There is a discussion thread in dev list that had differing opinions on
whether to include REST server in Apache Kafka or not. You can read 
more

about that in this thread
http://mail-archives.apache.org/mod_mbox/kafka-dev/201610.mbox/%3CCAMVt_ 


aymqeudm39znsxgktpdde46sowmqhsxop-+jmbcuv7...@mail.gmail.com%3E

   This is a VOTE thread to check interest in the community for
adding REST Server implementation in Apache Kafka.

Thanks,
Harsha







Re: [VOTE] Add REST Server to Apache Kafka

2016-10-26 Thread Jan Filipiak

So happy to see this reply.
I do think the same, actually makes it way harder to properly batch up 
records on http, as kafka core would need to know how to split your payload.

It would help people do the wrong thing IMO

best Jan

On 25.10.2016 23:58, Jay Kreps wrote:

-1

I think the REST server for Kafka that already exists is quite good and
getting contributions. Moving this into the core project doesn't solve a
problem that I see.

-Jay

On Tue, Oct 25, 2016 at 2:16 PM, Harsha Chintalapani 
wrote:


Hi All,
We are proposing to have a REST Server as part of  Apache Kafka
to provide producer/consumer/admin APIs. We Strongly believe having
REST server functionality with Apache Kafka will help a lot of users.
Here is the KIP that Mani Kumar wrote
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
80:+Kafka+Rest+Server.
There is a discussion thread in dev list that had differing opinions on
whether to include REST server in Apache Kafka or not. You can read more
about that in this thread
http://mail-archives.apache.org/mod_mbox/kafka-dev/201610.mbox/%3CCAMVt_
aymqeudm39znsxgktpdde46sowmqhsxop-+jmbcuv7...@mail.gmail.com%3E

   This is a VOTE thread to check interest in the community for
adding REST Server implementation in Apache Kafka.

Thanks,
Harsha





Re: KIP-33 Opt out from Time Based indexing

2016-09-08 Thread Jan Filipiak

Hi Jun,

thanks a lot for the hint, Ill check it out when I get a free minute!

Best Jan

On 07.09.2016 00:35, Jun Rao wrote:

Jan,

For the time rolling issue, Jiangjie has committed a fix (
https://issues.apache.org/jira/browse/KAFKA-4099) to trunk. Perhaps you can
help test out trunk and see if there are any other issues related to
time-based index?

Thanks,

Jun

On Mon, Sep 5, 2016 at 11:52 PM, Jan Filipiak 
wrote:


Hi Jun,

sorry for the late reply. Regarding B, my main concern was just complexity
of understanding what's going on.
As you can see it took me probably some 2 days or so, to fully grab all
the details in the implementation and what
the impacts are. Usually I prefer to turn things I don't use off, so I
don't have to bother. Log Append time will work for me.

Rolling logs was my main concern. The producer can specify the timestamp
and we use epoch inside the message, I'd bet money,
people in the company would have put this epoch also in the produce
record. => rollings logs as the broker thinks its millis.
So that would probably have caused us at least one outage if a big
producer had upgraded and done this, IMO likely mistake.

Id just hoped for a more obvious kill-switch, so I didn’t need to bother
that much.

Best Jan





On 29.08.2016 19:36, Jun Rao wrote:


Jan,

For the usefulness of time index, it's ok if you don't plan to use it.
However, I do think there are other people who will want to use it. Fixing
an application bug always requires some additional work. Intuitively,
being
able to seek back to a particular point of time for replay is going to be
much more efficient than always replaying from the very beginning,
especially when the log is retained for a long period of time. Sure, if
you
want to have more confidence, you want to rewind a bit conservatively. But
being able to rewind an extra hour makes a big difference from having to
rewind all to way to 7 days or however long the retention time is.

For the OffsetRequest, I actually agree with you that it's useful. People
can use that to find the first and the last offset and the offset based on
a specific point in time. The part that's a bit awkward with OffsetRequest
is that it's based on the last modified time of the log segment, which
makes it imprecise (precision is at the segment level, not message level)
and non-deterministic (last modified time may change). Another awkwardness
is that it supports returning a list of offsets after a specified
timestamp. We did that simply because timestamp was only at the segment
level then. So, our plan is to replace OffsetRequest with a new one. It
will give you the same functionality: find the first and the last offset
and the offset based on a specific point in time. It will just be better
since it's more precise and more deterministic. For your use case, it
seems
that you don't care about message creation time. Then, it's possible for
you to configure the broker with the log append time. Whether this should
be default at the Kafka level is debatable, but it won't prevent your use
case.

For your suggesting on refactoring, I still want to understand how
necessary it is. Your main concerns so far seem to be.
(a) Impact on rolling log segments.
(b) Time-based index is not useful for me.

Item (a) is a good point. Thanks for that. We will fix it. Item (b), I
have
given my view on this above. Are there any other things that you think
that
having a time-based index will hurt?

Thanks,

Jun

On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak 
wrote:

Hi Jun,

thanks for taking the time to answer on such a detailed level. You are
right Log.fetchOffsetByTimestamp works, the comment is just confusing
"// Get all the segments whose largest timestamp is smaller than target
timestamp" wich is apparently is not what takeWhile does (I am more on
the Java end of things, so I relied on the comment).

Regarding the frequent file rolling i didn't think of Logcompaction but
that indeed is a place where  can hit the fan pretty easy. especially
if you don't have many updates in there and you pass the timestamp along
in
a kafka-streams application. Bootstrapping a new application then indeed
could produce quite a few old messages kicking this logrolling of until a
recent message appears. I guess that makes it a practical issue again
even
with the 7 days. Thanks for pointing out! Id like to see the appendTime
as
default, I am very happy that I have it in the backpocket for purpose of
tighter sleep and not to worry to much about someone accidentally doing
something dodgy on a weekend with our clusters

Regarding the usefulness, you will not be able to sell it for me. I don't
know how people build applications with this ¯\_(ツ)_/¯ but I don't want
to
see them.
Look at the error recovery with timestamp seek:
For fixing a bug, a user needs to stop the SP, truncate all his
downstream
data perfectly based on their ti

Re: KIP-33 Opt out from Time Based indexing

2016-09-05 Thread Jan Filipiak

Hi Jun,

sorry for the late reply. Regarding B, my main concern was just 
complexity of understanding what's going on.
As you can see it took me probably some 2 days or so, to fully grab all 
the details in the implementation and what
the impacts are. Usually I prefer to turn things I don't use off, so I 
don't have to bother. Log Append time will work for me.


Rolling logs was my main concern. The producer can specify the timestamp 
and we use epoch inside the message, I'd bet money,
people in the company would have put this epoch also in the produce 
record. => rollings logs as the broker thinks its millis.
So that would probably have caused us at least one outage if a big 
producer had upgraded and done this, IMO likely mistake.


Id just hoped for a more obvious kill-switch, so I didn’t need to bother 
that much.


Best Jan




On 29.08.2016 19:36, Jun Rao wrote:

Jan,

For the usefulness of time index, it's ok if you don't plan to use it.
However, I do think there are other people who will want to use it. Fixing
an application bug always requires some additional work. Intuitively, being
able to seek back to a particular point of time for replay is going to be
much more efficient than always replaying from the very beginning,
especially when the log is retained for a long period of time. Sure, if you
want to have more confidence, you want to rewind a bit conservatively. But
being able to rewind an extra hour makes a big difference from having to
rewind all to way to 7 days or however long the retention time is.

For the OffsetRequest, I actually agree with you that it's useful. People
can use that to find the first and the last offset and the offset based on
a specific point in time. The part that's a bit awkward with OffsetRequest
is that it's based on the last modified time of the log segment, which
makes it imprecise (precision is at the segment level, not message level)
and non-deterministic (last modified time may change). Another awkwardness
is that it supports returning a list of offsets after a specified
timestamp. We did that simply because timestamp was only at the segment
level then. So, our plan is to replace OffsetRequest with a new one. It
will give you the same functionality: find the first and the last offset
and the offset based on a specific point in time. It will just be better
since it's more precise and more deterministic. For your use case, it seems
that you don't care about message creation time. Then, it's possible for
you to configure the broker with the log append time. Whether this should
be default at the Kafka level is debatable, but it won't prevent your use
case.

For your suggesting on refactoring, I still want to understand how
necessary it is. Your main concerns so far seem to be.
(a) Impact on rolling log segments.
(b) Time-based index is not useful for me.

Item (a) is a good point. Thanks for that. We will fix it. Item (b), I have
given my view on this above. Are there any other things that you think that
having a time-based index will hurt?

Thanks,

Jun

On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak 
wrote:


Hi Jun,

thanks for taking the time to answer on such a detailed level. You are
right Log.fetchOffsetByTimestamp works, the comment is just confusing
"// Get all the segments whose largest timestamp is smaller than target
timestamp" wich is apparently is not what takeWhile does (I am more on
the Java end of things, so I relied on the comment).

Regarding the frequent file rolling i didn't think of Logcompaction but
that indeed is a place where  can hit the fan pretty easy. especially
if you don't have many updates in there and you pass the timestamp along in
a kafka-streams application. Bootstrapping a new application then indeed
could produce quite a few old messages kicking this logrolling of until a
recent message appears. I guess that makes it a practical issue again even
with the 7 days. Thanks for pointing out! Id like to see the appendTime as
default, I am very happy that I have it in the backpocket for purpose of
tighter sleep and not to worry to much about someone accidentally doing
something dodgy on a weekend with our clusters

Regarding the usefulness, you will not be able to sell it for me. I don't
know how people build applications with this ¯\_(ツ)_/¯ but I don't want to
see them.
Look at the error recovery with timestamp seek:
For fixing a bug, a user needs to stop the SP, truncate all his downstream
data perfectly based on their time window.Then restart and do the first
fetch based
again on the perfect window timeout. From then on, he still has NO clue
whatsoever if messages that come later now with an earlier timestamp need
to go into the
previous window or not. (Note that there is  >>>absolutly no<<< way to
determine this in aggregated downstream windowed stores). So the user is in
 even though he can seek, he
can&#x

Re: KIP-33 Opt out from Time Based indexing

2016-08-26 Thread Jan Filipiak
e could allow the 
compaction to happen sooner (since the active segment is never 
cleaned). One option is to change the default log.roll.hours to 
infinite and also document the impact on changing log.roll.hours. 
Jiangjie, what do you think?


For the second odd thing, the OffsetRequest is a legacy request. It's 
awkward to use and we plan to deprecate it over time. That's why we 
haven't change the logic in serving OffsetRequest after KIP-33. The 
plan is to introduce a new OffsetRequest that will be exploiting the 
time based index. It's possible to have log segments with 
non-increasing largest timestamp. As you can see in 
Log.fetchOffsetsByTimestamp(), we simply iterate the segments in 
offset order and stop when we see the target timestamp.


For the third odd thing, one of the original reasons why the 
time-based index points to an offset instead of the file position is 
that it makes truncating the time index to an offset easier since the 
offset is in the index. Looking at the code, we could also store the 
file position in the time index and do truncation based on position, 
instead of offset. It probably has a slight advantage of consistency 
between the two indexes and avoiding another level of indirection when 
looking up the time index. Jiangjie, have we ever considered that?


The idea of log.message.timestamp.difference.max.ms 
<http://log.message.timestamp.difference.max.ms/> is to prevent the 
timestamp in the published messages to drift too far away from the 
current timestamp. The default value is infinite though.


Lastly, for the usefulness of time-based index, it's actually a 
feature that the community wanted and voted for, not just for 
Confluent customers. For example, being able to seek to an offset 
based on timestamp has been a frequently asked feature. This can be 
useful for at least the following scenarios: (1) If there is a bug in 
a consumer application, the user will want to rewind the consumption 
after fixing the logic. In this case, it's more convenient to rewind 
the consumption based on a timestamp. (2) In a multi data center 
setup, it's common for people to mirror the data from one Kafka 
cluster in one data center to another cluster in a different data 
center. If one data center fails, people want to be able to resume the 
consumption in the other data center. Since the offsets are not 
preserving between the two clusters through mirroring, being able to 
find a starting offset based on timestamp will allow the consumer to 
resume the consumption without missing any messages and also not 
replaying too many messages.


Thanks,

Jun


On Wed, Aug 24, 2016 at 5:05 PM, Jan Filipiak 
mailto:jan.filip...@trivago.com>> wrote:


Hey Jun,

I go and try again :), wrote the first one in quite a stressful
environment. The bottom line is that I, for our use cases, see a
to small use/effort ratio in this time index.
We do not bootstrap new consumers for key-less logs so frequently
and when we do it, they usually want everything (prod deployment)
or just start at the end ( during development).
That caused quite some frustration. Would be better if I could
just have turned it off and don't bother any more. Anyhow in the
meantime I had to dig deeper into the inner workings
and the impacts are not as dramatic as I initially assumed. But it
still carries along some oddities I want to list here.

first odd thing:
Quote
---
Enforce time based log rolling

Currently time based log rolling is based on the creating time of
the log segment. With this KIP, the time based rolling would be
changed to based on the largest timestamp ever seen in a log
segment. A new log segment will be rolled out if current time is
greater than largest timestamp ever seen in the log segment +
log.roll.ms <http://log.roll.ms>. When
message.timestamp.type=CreateTime, user should set
max.message.time.difference.ms
<http://max.message.time.difference.ms> appropriately together
with log.roll.ms <http://log.roll.ms> to avoid frequent log
segment roll out.

---
imagine a Mirrormaker falls behind and the Mirrormaker has a delay
of some time > log.roll.ms <http://log.roll.ms>.
From my understanding, when noone else is producing to this
partition except the mirror maker, the broker will start rolling
on every append?
Just because you maybe under DOS-attack and your application only
works in the remote location. (also a good occasion for MM to fall
behind)
But checking the default values indicates that it should indeed
not become a problem as log.roll.ms <http://log.roll.ms> defaults
to ~>7 days.


second odd thing:
Quote
---
A time index entry (/T/, /offset/) means that in this segment any
message whose timestamp is greater than /T/ come after /offset./


Re: KIP-33 Opt out from Time Based indexing

2016-08-24 Thread Jan Filipiak
s now feels wired to me. Gives me a 
feeling of complexity that I don't need and have a hard time figuring 
out how much other people can benefit from it. I hope that this feedback 
is useful and helps to understand my scepticism regarding this thing. 
There were some other oddities that I have a hard time recalling now. So 
i guess the index was build for a specific confluent customer, will 
there be any blogpost about their usecase? or can you share it?


Best Jan

On 24.08.2016 16:47, Jun Rao wrote:

Jan,

Thanks for the reply. I actually wasn't sure what your main concern on 
time-based rolling is. Just a couple of clarifications. (1) Time-based 
rolling doesn't control how long a segment will be retained for. For 
retention, if you use time-based, it will now be based on the 
timestamp in the message. If you use size-based, it works the same as 
before. Is your concern on time-based retention? If so, you can always 
configure the timestamp in all topics to be log append time, which 
will give you the same behavior as before. (2) The creation time of 
the segment is never exposed to the consumer and therefore is never 
preserved in MirrorMaker. In contrast, the timestamp in the message 
will be preserved in MirrorMaker. So, not sure what your concern on 
MirrorMaker is.


Jun

On Wed, Aug 24, 2016 at 5:03 AM, Jan Filipiak 
mailto:jan.filip...@trivago.com>> wrote:


Hi Jun,

I copy pasted this mail from the archive, as I somehow didn't
receive it per mail. I will sill make some comments in line,
hopefully you can find them quick enough, my apologies.

To make things more clear, you should also know, that all messages
in our kafka setup have a common way to access their timestamp
already (its encoded in the value the same way always)
Sometimes this is a logical time (eg same timestamp accross many
different topics / partitions), say PHP request start time or the
like. So kafkas internal timestamps are not really attractive
for us anyways currently.

I hope I can make a point and not waste your time.

Best Jan,

hopefully everything makes sense



Jan,

Currently, there is no switch to disable the time based index.

There are quite a few use cases of time based index.

1. From KIP-33's wiki, it allows us to do time-based retention
accurately.
Before KIP-33, the time-based retention is based on the last
modified time
of each log segment. The main issue is that last modified time can
change
over time. For example, if a broker loses storage and has to
re-replicate
all data, those re-replicated segments will be retained much
longer since
their last modified time is more recent. Having a time-based index
allows
us to retain segments based on the message time, not the last modified
time. This can also benefit KIP-71, where we want to combine
time-based
retention and compaction.

/If your sparse on discspace, one could try to get by that with
retention.bytes/
or, as we did, ssh into the box and rm it, which worked quite good
when no one reads it.
Chuckles a little when its read but readers usually do an
auto.offset.reset
(they are to slow any ways if they reading the last segments hrhr).

2. In KIP-58, we want to delay log compaction based on a configurable
amount of time. Time-based index allows us to do this more accurately.

/good point, seems reasonable/

3. We plan to add an api in the consumer to allow seeking to an offset
based on a timestamp. The time based index allows us to do this more
accurately and fast.

/Sure, I personally feel that you rarely want to do this. For
Camus, we used max.pull.historic.days (or simmilliar) successfully
quite often. we just gave it an extra day and got what we wanted
and for debugging my bisect tool works well enough. So these are
the 2 usecases we expierenced already and found a decent way
around it./

Now for the impact.

a. There is a slight change on how time-based rolling works.
Before KIP-33,
rolling was based on the time when a segment was loaded in the broker.
After KIP-33, rolling is based on the time of the first message of a
segment. Not sure if this is your concern. In the common case, the two
behave more or less the same. The latter is actually more
deterministic
since it's not sensitive to broker restarts.

/This is part of my main concern indeed. This is what scares me
and I preffered to just opt out, instead of reviewing all our
pipelines to check whats gonna happen when we put it live.
For Example the Mirrormakers, If they want to preserve create time
from the source cluster and publish the same create time (wich
they should do, if you don't encode your own timestamps and want
to have proper kafka-streams windowing). Then I am quite conc

Re: Re: KIP-33 Opt out from Time Based indexing

2016-08-24 Thread Jan Filipiak
ff74a2988c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L715
might end up with for(Index i : indexes) [i.shouldRoll(segment)}? wich should 
already be easier.
If users don't want time based indexing, just don't put the timebased index in 
the Set then and everything should work like a charm.
RPC calls that work on the specific indexes would need to throw an exception of 
some kind.
Just an idea.
/
Jun




On 22.08.2016 09:24, Jan Filipiak wrote:

Hello everyone,

I stumbled across KIP-33 and the time based index, while briefly 
checking the wiki and commits, I fail to find a way to opt out.
I saw it having quite some impact on when logs are rolled and was 
hoping not to have to deal with all of that. Is there a disable switch 
I overlooked?


Does anybody have a good use case where the timebase index comes in 
handy? I made a custom console consumer for me,
that can bisect a log based on time. Its just a quick probabilistic 
shot into the log but is sometimes quite useful for some debugging.


Best Jan




KIP-33 Opt out from Time Based indexing

2016-08-22 Thread Jan Filipiak

Hello everyone,

I stumbled across KIP-33 and the time based index, while briefly 
checking the wiki and commits, I fail to find a way to opt out.
I saw it having quite some impact on when logs are rolled and was hoping 
not to have to deal with all of that. Is there a disable switch I 
overlooked?


Does anybody have a good use case where the timebase index comes in 
handy? I made a custom console consumer for me,
that can bisect a log based on time. Its just a quick probabilistic shot 
into the log but is sometimes quite useful for some debugging.


Best Jan


Re: Kafka Streams

2016-03-12 Thread Jan Filipiak

Hi,

I am very exited about all of this in general. Sadly I haven’t had the 
time to really take a deep look. One thing that is/was always a 
difficult topic to resolve many to many relationships in table x table x 
table joins is the repartitioning that has to happen at some point.


From the documentation I saw this:

"The *keys* of data records determine the partitioning of data in both 
Kafka and Kafka Streams, i.e. how data is routed to specific partitions 
within topics."


This feels unnecessarily restrictive as i can't currently imagin how to 
resolve many to many relationships with this. One can also emmit every 
record to many partitions to make up for no read replicas in kafka 
aswell as partitioning schemes that don't work like this (Shards 
processing overlapping key spaces).


I would really love to hear your thoughts on these topics. Great work! 
Google grade technologies for everyone!

I <3 logs



On 10.03.2016 22:26, Jay Kreps wrote:

Hey all,

Lot's of people have probably seen the ongoing work on Kafka Streams
happening. There is no real way to design a system like this in a vacuum,
so we put up a blog, some snapshot docs, and something you can download and
use easily to get feedback:

http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple

We'd love comments or thoughts from anyone...

-Jay





Re: Consuming "backwards"?

2015-11-09 Thread Jan Filipiak

Hi,

obviously this should be build different IMHO (unless I fail to see 
something that prevents you from doing this).

When you realize you fall behind do this:

1. remember your current
2. get the latest offset
3. fork a process to replicate from the current offset +1 to the latest 
one just fetched.

4. reset your offset to the latest +1

I fail to see the reason why a topic change would be needed. If the 
downstream app cant handle the data, you are in trouble anyways, and if 
your message processing is the bottleneck more partitions might also be 
an option.  I would try to not do any tricks and find out what the lag 
is caused by and then fix whatever causes the lag. Its 1 am in Germany 
there might be off by one errors in the algorithm above.


Best
Jan


On 04.11.2015 18:13, Otis Gospodnetić wrote:

This is an aancient thread, but I thought I'd point to
http://blog.sematext.com/2015/11/04/kafka-real-time-stream-multi-topic-catch-up-trick/
which gives a short description of how we ended up implementing this.
It seems to work well for us, but if there are better ways to do it, esp.
now with 0.9 around the corner, I'm all eyeballs!

Otis
--
Monitoring - Log Management - Alerting - Anomaly Detection
Solr & Elasticsearch Consulting Support Training - http://sematext.com/


On Fri, Dec 6, 2013 at 7:09 PM, Joe Stein  wrote:


yes, also you can read backwards on the stream if you do this in the
consumer

val maoList: Iterable[MessageAndOffset] = for(messageAndOffset <-
messageSet if(numMessagesConsumed < maxMessages)) yield messageAndOffset

for(messageAndOffset <- maoList.toList.reverse) {

this way every read is the latest before the next earliest so when you
fetch 18,19,20 you will see them coming in as 20,19,18

/***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop 
/


On Fri, Dec 6, 2013 at 7:02 PM, Steven Parkes 
wrote:


Right. If you're not reading contiguously, you need to remember the

ranges

that you have/haven't read. As long as you do that, it all works out, I
think.

A kafka client always has to store the last offset it read. In the
simplest "both directions" case where you start with current and read in
both directions, you just need to remember the first offset you've read

as

well.

On Dec 6, 2013, at 3:50 PM, Joe Stein  wrote:


you have to allow the fetchSize to be variable so in your example since

the

new highwatermark is 12 and the last consumsed message is 10

fetchSize = if (highwatermark - lastConsumedOffset < 3) highwatermark -
lastConsumedOffset else 3

the real trick though is missing messages having to keep track of more

than

one index

so lets say now you have 7 more published (13,14,15,16,17,18,19)

you then read 17,18,19 (and while that happens 5 more are published ...
20,21,22,23,24)

now when you read 22,23,24 ... you have to keep track of not only 22 as

the

last read so you scoop up 20 and 21 but also remember still 17 so you

get

16,15,14,13

so it can be done with some fancy logic to manage the index and offsets
(and persist them)


On Fri, Dec 6, 2013 at 6:44 PM, Otis Gospodnetic <

otis.gospodne...@gmail.com

wrote:
Hi,

On Fri, Dec 6, 2013 at 6:32 PM, Steven Parkes 
wrote:


On Dec 6, 2013, at 2:03 PM, Otis Gospodnetic <

otis.gospodne...@gmail.com

wrote:


but I think the
problem is that each time we grab we could get some of the same

messages

we

already processed

Doesn't setting the fetchSize to "how far back we need to grab"

handle

that?


I *think* it doesn't, but I'm wrong every day N times, so

I think this is what would happen:
1) imagine 10 messages in the broker m1 - m10
2) consumer grabs last N (=3): m8, m9, m10
3) while it's doing that and before consumer polls for more messages
producer publishes 2 more: m11 and m12
4) consumer now polls again. It asks broker for publisher offset and

gets

the answer: 12
5) good, says consumer, let me then fetch everything after offset

12-3=9:

m10, m11, m12

Problem: consumer got m10 again, but it was already processed in 2).

No?  Please correct me if I'm wrong anywhere.

Thanks,
Otis
--
Performance Monitoring * Log Analytics * Search Analytics
Solr & Elasticsearch Support * http://sematext.com/







Re: log compaction scaling with ~100m messages

2015-10-08 Thread Jan Filipiak

Hi,

just want to pick this up again. You can always use more partitions to 
reduce the number of keys handled by a single broker and parallelize the 
compaction. So with sufficient number of machines and the ability to 
partition I don’t see you running into problems.


Jan

On 07.10.2015 05:34, Feroze Daud wrote:

hi!
We have a use case where we want to store ~100m keys in kafka. Is there any 
problem with this approach?
I have heard from some people using kafka, that kafka has a problem when doing 
log compaction with those many number of keys.
Another topic might have around 10 different K/V pairs for each key in the 
primary topic. The primary topic's keyspace is approx of 100m keys. We would 
like to store this in kafka because we are doing a lot of stream processing on 
these messages, and want to avoid writing another process to recompute data 
from snapshots.
So, in summary:
primary topic: ~100m keyssecondary topic: ~1B keys
Is it feasible to use log compaction at such a scale of data?
Thanks
feroze.




Re: Hdfs fSshell getmerge

2015-07-24 Thread Jan Filipiak

Sorry wrong mailing list

On 24.07.2015 16:44, Jan Filipiak wrote:

Hello hadoop users,

I have an idea about a small feature for the getmerge tool. I recently 
was in the need of using the new line option -nl because the files I 
needed to merge simply didn't had one.
I was merging all the files from one directory and unfortunately this 
directory also included empty files, which effectively led to multiple 
newlines append after some files.

I needed to remove them manually afterwards.

In this situation it is maybe good to have another argument that 
allows skipping empty files. I just wrote down 2 change one could try 
at the end. Do you guys consider this as a good improvement to the 
command line tools?


Thing one could try to implement this feature:

The call for IOUtils.copyBytes(in, out, getConf(), false); doesn't 
return the number of bytes copied which would be convenient as one 
could skip append the new line when 0 bytes where copied

Or one would check the file size before.

Please let me know If you would consider this useful and is worth a 
feature ticket in Jira.


Thank you
Jan




Hdfs fSshell getmerge

2015-07-24 Thread Jan Filipiak

Hello hadoop users,

I have an idea about a small feature for the getmerge tool. I recently 
was in the need of using the new line option -nl because the files I 
needed to merge simply didn't had one.
I was merging all the files from one directory and unfortunately this 
directory also included empty files, which effectively led to multiple 
newlines append after some files.

I needed to remove them manually afterwards.

In this situation it is maybe good to have another argument that allows 
skipping empty files. I just wrote down 2 change one could try at the 
end. Do you guys consider this as a good improvement to the command line 
tools?


Thing one could try to implement this feature:

The call for IOUtils.copyBytes(in, out, getConf(), false); doesn't 
return the number of bytes copied which would be convenient as one could 
skip append the new line when 0 bytes where copied

Or one would check the file size before.

Please let me know If you would consider this useful and is worth a 
feature ticket in Jira.


Thank you
Jan


Questions regarding Kafka-1477

2015-07-02 Thread Jan Filipiak

Hi,

just out of curiosity and because of Eugene's email, I browsed 
Kafka-1477 and it talks about SSL alot. So I thought I might throw in 
this http://tools.ietf.org/html/rfc7568 RFC. It basically says move away 
from SSL now and only do TLS. The title of the ticket still mentions TLS 
but afterwards its only SSL, haven't looked at any patches or library 
code so I can't really judge what's going on.


Further I found people starting to talk about sendfile(2) TLS support, 
here for example https://people.freebsd.org/~rrs/asiabsd_2015_tls.pd 
f. So maybe we 
can keep this door open that at some point the Kernel will be able to do 
TLS for us?






On 02.07.2015 22:24, eugene miretsky wrote:

HI,

There is some work being done on security in Kafka:
Confluence: https://cwiki.apache.org/confluence/display/KAFKA/Security
Jira: https://issues.apache.org/jira/browse/KAFKA-1682

It seems like the main blockers are KAFKA-1477
, KAFKA-1691
  and KAFKA-1690
.

Is there an anticipated road map for when all the security features will be
done and merged in to trunk?


(


Re: Log compaction not working as expected

2015-06-17 Thread Jan Filipiak

Ah misread that sorry!

On 17.06.2015 14:26, Shayne S wrote:

Right, you can see I've got segment.ms set.  The trick is that they don't
actually roll over until something new arrives. If your topic is idle (not
receiving messages), it won't ever roll over to a new segment, and thus the
last segment will never be compacted.

Thanks!
Shayne

On Wed, Jun 17, 2015 at 5:58 AM, Jan Filipiak 
wrote:


Hi,

you might want to have a look here:
http://kafka.apache.org/documentation.html#topic-config
_segment.ms_ and _segment.bytes _ should allow you to control the
time/size when segments are rolled.

Best
Jan


On 16.06.2015 14:05, Shayne S wrote:


Some further information, and is this a bug?  I'm using 0.8.2.1.

Log compaction will only occur on the non active segments.  Intentional or
not, it seems that the last segment is always the active segment.  In
other
words, an expired segment will not be cleaned until a new segment has been
created.

As a result, a log won't be compacted until new data comes in (per
partition). Does this mean I need to send the equivalent of a pig (
https://en.wikipedia.org/wiki/Pigging) through each partition in order to
force compaction?  Or can I force the cleaning somehow?

Here are the steps to recreate:

1. Create a new topic with a 5 minute segment.ms:

kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
--replication-factor 1 --partitions 1 --config cleanup.policy=compact
--config min.cleanable.dirty.ratio=0.01 --config segment.ms=30

2. Repeatedly add messages with identical keys (3x):

echo "ABC123,{\"test\": 1}" | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

3. Wait 5+ minutes and confirm no log compaction.
4. Once satisfied, send a new message:

echo "DEF456,{\"test\": 1}" | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

5. Log compaction will occur quickly soon after.

Is my use case of infrequent logs not supported? Is this intentional
behavior? It's unnecessarily challenging to target each partition with a
dummy message to trigger compaction.

Also, I believe there is another issue with logs originally configured
without a segment timeout that lead to my original issue.  I still cannot
get those logs to compact.

Thanks!
Shayne






Re: Log compaction not working as expected

2015-06-17 Thread Jan Filipiak

Hi,

you might want to have a look here: 
http://kafka.apache.org/documentation.html#topic-config
_segment.ms_ and _segment.bytes _ should allow you to control the 
time/size when segments are rolled.


Best
Jan

On 16.06.2015 14:05, Shayne S wrote:

Some further information, and is this a bug?  I'm using 0.8.2.1.

Log compaction will only occur on the non active segments.  Intentional or
not, it seems that the last segment is always the active segment.  In other
words, an expired segment will not be cleaned until a new segment has been
created.

As a result, a log won't be compacted until new data comes in (per
partition). Does this mean I need to send the equivalent of a pig (
https://en.wikipedia.org/wiki/Pigging) through each partition in order to
force compaction?  Or can I force the cleaning somehow?

Here are the steps to recreate:

1. Create a new topic with a 5 minute segment.ms:

kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
--replication-factor 1 --partitions 1 --config cleanup.policy=compact
--config min.cleanable.dirty.ratio=0.01 --config segment.ms=30

2. Repeatedly add messages with identical keys (3x):

echo "ABC123,{\"test\": 1}" | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

3. Wait 5+ minutes and confirm no log compaction.
4. Once satisfied, send a new message:

echo "DEF456,{\"test\": 1}" | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

5. Log compaction will occur quickly soon after.

Is my use case of infrequent logs not supported? Is this intentional
behavior? It's unnecessarily challenging to target each partition with a
dummy message to trigger compaction.

Also, I believe there is another issue with logs originally configured
without a segment timeout that lead to my original issue.  I still cannot
get those logs to compact.

Thanks!
Shayne





Pulling Snapshots from Kafka, Log compaction last compact offset

2015-04-30 Thread Jan Filipiak

Hello Everyone,

I am quite exited about the recent example of replicating PostgresSQL 
Changes to Kafka. My view on the log compaction feature always had been 
a very sceptical one, but now with its great potential exposed to the 
wide public, I think its an awesome feature. Especially when pulling 
this data into HDFS as a Snapshot, it is (IMO) a sqoop killer. So I want 
to thank everyone who had the vision of building these kind of systems 
during a time I could not imagine those.


There is one open question that I would like people to help me with. 
When pulling a snapshot of a partition into HDFS using a camus-like 
application I feel the need of keeping a Set of all keys read so far and 
stop as soon as I find a key beeing already in my set. I use this as an 
indicator of how far the log compaction has happened already and only 
pull up to this point. This works quite well as I do not need to keep 
the messages but only their keys in memory.


The question I want to raise with the community is:

How do you prevent pulling the same record twice (in different versions) 
and would it be beneficial if the "OffsetResponse" would also return the 
last offset that got compacted so far and the application would just 
pull up to this point?


Looking forward for some recommendations and comments.

Best
Jan



Re: Producer does not recognize new brokers

2015-04-13 Thread Jan Filipiak

Hey,

try to not have newlines \n in your jsonfile. I think the parser dies on 
those and then claims the file is empty


Best
Jan




On 13.04.2015 12:06, Ashutosh Kumar wrote:

Probably you should first try to generate proposed plan using --generate
option and then edit that if needed.
thanks


On Mon, Apr 13, 2015 at 3:12 PM, shadyxu  wrote:


Thanks guys. You are right and then here comes another problems:

I added new brokers 4, 5 and 6. Now I want to move partitions 3, 4 and
5(currently on broker 1, 2 and 3) of topic test to these brokers. I wrote
r.json file like this:

{"partitions":
[{"topic": "test","partition": 3,"replicas": [4]},
{"topic":"test","partition":4,"replicas":[5]},
{"topic":"test","partition":5,"replicas":[6]},],
"version":1}

and then ran:

 bin/kafka-reassign-partitions.sh --zookeeper [some-kafka-address]
--reassignment-json-file r.json --execute

Kafka gave me this error message:

 kafka.common.AdminCommandFailedException: Partition reassignment data
file r.json is empty

I googled, seems Kafka parse the json file but found that no partitions
were needed to be removed. Was my json file not properly configured?

2015-04-13 14:00 GMT+08:00 Ashutosh Kumar :


I think you need to re balance the cluster.
something like

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
--topics-to-move-json-file topics-to-move.json --broker-list "5,6"
--generate


On Mon, Apr 13, 2015 at 11:22 AM, shadyxu  wrote:


I added several new brokers to the cluster, there should'v been a

rebalance

but it seemed that the producer was not aware of the new brokers. Data

kept

being sent to the old brokers and there were no partitions on the new
brokers.

I configured the old brokers to the producer and did not restart the
producer or add the new brokers to the configuration.

What may be the problems?





Re: High CPU usage of Crc32 on Kafka broker

2015-02-22 Thread Jan Filipiak
I just want to bring up that idea of no server side de/recompression 
again. Features like KAFKA-1499 
 seem to steer the 
project into a different direction and the fact that tickets like 
KAFKA-845  are not 
getting much attention gives the same impression. This is something my 
head keeps spinning around almost 24/7 recently.


The problem I see is that CPU's are not the cheapest part of a new 
server and if you can spare a gigahertz or some cores by just making 
sure your configs are the same across all producers I would always opt 
for the operational overhead instead of the bigger servers. I think this 
will usually decrease the tco's of kafka installations.


I am currently not familiar enough with the codebase to judge if server 
side decompression happens before acknowledge. If so, these would be 
some additional milliseconds to respond faster if we could spare 
de/recompression.


Those are my thoughts about server side de/recompression. It would be 
great if I could get some responses and thoughts back.


Jan



On 07.11.2014 00:23, Jay Kreps wrote:

I suspect it is possible to save and reuse the CRCs though it might be a
bit of an invasive change. I suspect the first usage is when we are
checking the validity of the messages and the second is from when we
rebuild the compressed message set (I'm assuming you guys are using
compression because I think we optimize this out otherwise). Technically I
think the CRCs stay the same.

An alternative approach, though, would be working to remove the need for
recompression entirely on the broker side by making the offsets in the
compressed message relative to the base offset of the message set. This is
a much more invasive change but potentially better as it would also remove
the recompression done on the broker which is also CPU heavy.

-Jay

On Thu, Nov 6, 2014 at 2:36 PM, Allen Wang 
wrote:


Sure. Here is the link to the screen shot of jmc with the JTR file loaded:

http://picpaste.com/fligh-recorder-crc.png



On Thu, Nov 6, 2014 at 2:12 PM, Neha Narkhede 
wrote:


Allen,

Apache mailing lists don't allow attachments. Could you please link to a
pastebin or something?

Thanks,
Neha

On Thu, Nov 6, 2014 at 12:02 PM, Allen Wang 
wrote:


After digging more into the stack trace got from flight recorder (which

is

attached), it seems that Kafka (0.8.1.1) can optimize the usage of

Crc32.

The stack trace shows that Crc32 is invoked twice from Log.append().

First

is from the line number 231:

val appendInfo = analyzeAndValidateMessageSet(messages)

The second time is from line 252 in the same function:

validMessages = validMessages.assignOffsets(offset, appendInfo.codec)

If one of the Crc32 invocation can be eliminated, we are looking at

saving

at least 7% of CPU usage.

Thanks,
Allen




On Wed, Nov 5, 2014 at 6:32 PM, Allen Wang  wrote:


Hi,

Using flight recorder, we have observed high CPU usage of CRC32
(kafka.utils.Crc32.update()) on Kafka broker. It uses as much as 25%

of

CPU

on an instance. Tracking down stack trace, this method is invoked by
ReplicaFetcherThread.

Is there any tuning we can do to reduce this?

Also on the topic of CPU utilization, we observed that overall CPU
utilization is proportional to AllTopicsBytesInPerSec metric. Does

this

metric include incoming replication traffic?

Thanks,
Allen






Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Jan Filipiak

Hello Everyone,

I would very much appreciate if someone could provide me a real world 
examplewhere it is more convenient to implement the serializers instead 
of just making sure to provide bytearrays.


The code we came up with explicitly avoids the serializer api. I think 
it is common understanding that if you want to transport data you need 
to have it as a bytearray.


If at all I personally would like to have a serializer interface that 
takes the same types as the producer


public interface Serializer extends Configurable {
public byte[] serializeKey(K data);
public byte[] serializeValue(V data);
public void close();
}

this would avoid long serialize implementations with branches like 
"switch(topic)" or "if(isKey)". Further serializer per topic makes more 
sense in my opinion. It feels natural to have a one to one relationship 
from types to topics or at least only a few partition per type. But as 
we inherit the type from the producer we would have to create many 
producers. This would create additional unnecessary connections to the 
brokers. With the serializers we create a one type to all topics 
relationship and the only type that satisfies that is the bytearray or 
Object. Am I missing something here? As said in the beginning I would 
like to that usecase that really benefits from using the serializers. I 
think in theory they sound great but they cause real practical issues 
that may lead users to wrong decisions.


-1 for putting the serializers back in.

Looking forward to replies that can show me the benefit of serializes 
and especially how the

Type => topic relationship can be handled nicely.

Best
Jan



On 25.11.2014 02:58, Jun Rao wrote:

Hi, Everyone,

I'd like to start a discussion on whether it makes sense to add the
serializer api back to the new java producer. Currently, the new java
producer takes a byte array for both the key and the value. While this api
is simple, it pushes the serialization logic into the application. This
makes it hard to reason about what type of data is being sent to Kafka and
also makes it hard to share an implementation of the serializer. For
example, to support Avro, the serialization logic could be quite involved
since it might need to register the Avro schema in some remote registry and
maintain a schema cache locally, etc. Without a serialization api, it's
impossible to share such an implementation so that people can easily reuse.
We sort of overlooked this implication during the initial discussion of the
producer api.

So, I'd like to propose an api change to the new producer by adding back
the serializer api similar to what we had in the old producer. Specially,
the proposed api changes are the following.

First, we change KafkaProducer to take generic types K and V for the key
and the value, respectively.

public class KafkaProducer implements Producer {

 public Future send(ProducerRecord record, Callback
callback);

 public Future send(ProducerRecord record);
}

Second, we add two new configs, one for the key serializer and another for
the value serializer. Both serializers will default to the byte array
implementation.

public class ProducerConfig extends AbstractConfig {

 .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
"org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
KEY_SERIALIZER_CLASS_DOC)
 .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
"org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
VALUE_SERIALIZER_CLASS_DOC);
}

Both serializers will implement the following interface.

public interface Serializer extends Configurable {
 public byte[] serialize(String topic, T data, boolean isKey);

 public void close();
}

This is more or less the same as what's in the old producer. The slight
differences are (1) the serializer now only requires a parameter-less
constructor; (2) the serializer has a configure() and a close() method for
initialization and cleanup, respectively; (3) the serialize() method
additionally takes the topic and an isKey indicator, both of which are
useful for things like schema registration.

The detailed changes are included in KAFKA-1797. For completeness, I also
made the corresponding changes for the new java consumer api as well.

Note that the proposed api changes are incompatible with what's in the
0.8.2 branch. However, if those api changes are beneficial, it's probably
better to include them now in the 0.8.2 release, rather than later.

I'd like to discuss mainly two things in this thread.
1. Do people feel that the proposed api changes are reasonable?
2. Are there any concerns of including the api changes in the 0.8.2 final
release?

Thanks,

Jun