[jira] [Commented] (KAFKA-6989) Support Async Processing in Streams

2018-09-24 Thread Virgil Palanciuc (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625477#comment-16625477
 ] 

Virgil Palanciuc commented on KAFKA-6989:
-

num-stream-threads just allows you to process more than one partition in 
parallel, in a single process. It's still synchronous, in that you can't have 
multiple messages processed in parallel, from the same partition. 

TBH the entire philosophy of kafka streams is that you increase the parallelism 
by increasing the number of partitions - and this works well when the work that 
you need to do is not massively IO-bound (e.g. you don't need to call an 
external/ 3rd-party HTTP endpoint).  It is possible to do this "manually" by 
launching an async task and pretending that you finished processing your 
message, but it is cumbersome/tedious (e.g. at the end of processing, you need 
to use your own kafka producer for writing back the result, and there are some 
pitfalls /misconfigurations that may happen there). This ticket is about adding 
this sort of async processing as a "first class" stream processor in Kafka 
Streams. 

> Support Async Processing in Streams
> ---
>
> Key: KAFKA-6989
> URL: https://issues.apache.org/jira/browse/KAFKA-6989
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> Today Kafka Streams use a single-thread per task architecture to achieve 
> embarrassing parallelism and good isolation. However there are a couple 
> scenarios where async processing may be preferable:
> 1) External resource access or heavy IOs with high-latency. Suppose you need 
> to access a remote REST api, read / write to an external store, or do a heavy 
> disk IO operation that may result in high latency. Current threading model 
> would block any other records before this record's done, waiting on the 
> remote call / IO to finish.
> 2) Robust failure handling with retries. Imagine the app-level processing of 
> a (non-corrupted) record fails (e.g. the user attempted to do a RPC to an 
> external system, and this call failed), and failed records are moved into a 
> separate "retry" topic. How can you process such failed records in a scalable 
> way? For example, imagine you need to implement a retry policy such as "retry 
> with exponential backoff". Here, you have the problem that 1. you can't 
> really pause processing a single record because this will pause the 
> processing of the full stream (bottleneck!) and 2. there is no 
> straight-forward way to "sort" failed records based on their "next retry 
> time" (think: priority queue).
> 3) Delayed processing. One use case is delaying re-processing (e.g. "delay 
> re-processing this event for 5 minutes") as mentioned in 2), another is for 
> implementing a scheduler: e.g. do some additional operations later based on 
> this processed record. based on Zalando Dublin, for example, are implementing 
> a distributed web crawler. Note that although this feature can be handled in 
> punctuation, it is not well aligned with our current offset committing 
> behavior, which always advance the offset once the record has been done 
> traversing the topology.
> I'm thinking of two options to support this feature:
> 1. Make the commit() mechanism more customizable to users for them to 
> implement multi-threading processing themselves: users can always do async 
> processing in the Processor API by spawning a thread-poll, e.g. but the key 
> is that the offset to be committed should be only advanced with such async 
> processing is done. This is a light-weight approach: we provide all the 
> pieces and tools, and users stack them up to build their own LEGOs.
> 2. Provide an general API to do async processing in Processor API, and take 
> care of the offsets committing internally. This is a heavy-weight approach: 
> the API may not cover all async scenarios, but it is a easy way to cover the 
> rest majority scenarios, and users do not need to worry of internal 
> implementation details such as offsets and fault tolerance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7269) KStream.merge is not documented

2018-08-10 Thread Virgil Palanciuc (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16575845#comment-16575845
 ] 

Virgil Palanciuc commented on KAFKA-7269:
-

Might I suggest to add "union" as an alternative to "merge"? "union" is the 
Spark operator, and it's also the mathematical operator that makes most sense, 
IMO.

> KStream.merge is not documented
> ---
>
> Key: KAFKA-7269
> URL: https://issues.apache.org/jira/browse/KAFKA-7269
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0
>Reporter: John Roesler
>Priority: Major
>  Labels: beginner, newbie
>
> If I understand the operator correctly, it should be documented as a 
> stateless transformation at 
> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#stateless-transformations



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6989) Support Async Processing in Streams

2018-06-04 Thread Virgil Palanciuc (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500596#comment-16500596
 ] 

Virgil Palanciuc edited comment on KAFKA-6989 at 6/4/18 5:48 PM:
-

It seems to me that option 2 is a false-hope... i've never seen the case in 
real-life production that "users do not need to worry about internal 
implementation details" - except in cases where the underlying technology is 
grossly under-utilized. If you push it to the limit an any way, you start to 
care very much about the internal implementation details. 

Since this is very much a feature for users who care to push things to the 
limit (synchronous processing is not enough), I'd vote for option 1. And then 
maybe implement 2 based on it - but start by delivering option 1. 

One way (that is not discussed in the ticket) to potentially handle this is to 
schedule a punctuator from the "process" task, and then cancel it & schedule 
the next punctuator, and so on - this effectively achieves the "retry with 
backoff" functionality, at the cost of creating/scheduling a lot of punctuators 
(so it may not be very efficient). But maybe we can improve the 
schedule/punctuator API to allow achieving this. We'd also need to modify the 
offset management in the tasks though - this might be the trickier part.

I'm willing to take a stab at implementing this & submitting a PR - is there 
anybody who can offer guidance/ discuss the preferred approach, prior to 
implementation?


was (Author: virgilp):
It seems to me that option 2 is a false-hope... i've never seen the case in 
real-life production that "users do not need to worry about internal 
implementation details" - except in cases where the underlying technology is 
grossly under-utilized. If you push it to the limit an any way, you start to 
care very much about the internal implementation details. 

Since this is very much a feature for users who care to push things to the 
limit (synchronous processing is not enough), I'd vote for option 1. And then 
maybe implement 2 based on it - but start by delivering option 1. 

One way (that is not discussed in the ticket) to (potentially) handle this is 
to schedule a punctuator from the "process" task, and then cancel it & schedule 
the next punctuator, and so on - this effectively achieves the "retry with 
backoff" functionality, at the cost of creating/scheduling a lot of punctuators 
(so it may not be very efficient). But maybe we can improve the 
schedule/punctuator API to allow achieving this. We'd also need to modify the 
offset management in the tasks though - this might be the trickier part.

I'm willing to take a stab at implementing this & submitting a PR - is there 
anybody who can offer guidance/ discuss the preferred approach, prior to 
implementation?

> Support Async Processing in Streams
> ---
>
> Key: KAFKA-6989
> URL: https://issues.apache.org/jira/browse/KAFKA-6989
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Today Kafka Streams use a single-thread per task architecture to achieve 
> embarrassing parallelism and good isolation. However there are a couple 
> scenarios where async processing may be preferable:
> 1) External resource access or heavy IOs with high-latency. Suppose you need 
> to access a remote REST api, read / write to an external store, or do a heavy 
> disk IO operation that may result in high latency. Current threading model 
> would block any other records before this record's done, waiting on the 
> remote call / IO to finish.
> 2) Robust failure handling with retries. Imagine the app-level processing of 
> a (non-corrupted) record fails (e.g. the user attempted to do a RPC to an 
> external system, and this call failed), and failed records are moved into a 
> separate "retry" topic. How can you process such failed records in a scalable 
> way? For example, imagine you need to implement a retry policy such as "retry 
> with exponential backoff". Here, you have the problem that 1. you can't 
> really pause processing a single record because this will pause the 
> processing of the full stream (bottleneck!) and 2. there is no 
> straight-forward way to "sort" failed records based on their "next retry 
> time" (think: priority queue).
> 3) Delayed processing. One use case is delaying re-processing (e.g. "delay 
> re-processing this event for 5 minutes") as mentioned in 2), another is for 
> implementing a scheduler: e.g. do some additional operations later based on 
> this processed record. based on Zalando Dublin, for example, are implementing 
> a distributed web crawler. Note that although this feature can be handled in 
> punctuation, it is not well aligned with our current offset committing 
> b

[jira] [Commented] (KAFKA-6989) Support Async Processing in Streams

2018-06-04 Thread Virgil Palanciuc (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500596#comment-16500596
 ] 

Virgil Palanciuc commented on KAFKA-6989:
-

It seems to me that option 2 is a false-hope... i've never seen the case in 
real-life production that "users do not need to worry about internal 
implementation details" - except in cases where the underlying technology is 
grossly under-utilized. If you push it to the limit an any way, you start to 
care very much about the internal implementation details. 

Since this is very much a feature for users who care to push things to the 
limit (synchronous processing is not enough), I'd vote for option 1. And then 
maybe implement 2 based on it - but start by delivering option 1. 

One way (that is not discussed in the ticket) to (potentially) handle this is 
to schedule a punctuator from the "process" task, and then cancel it & schedule 
the next punctuator, and so on - this effectively achieves the "retry with 
backoff" functionality, at the cost of creating/scheduling a lot of punctuators 
(so it may not be very efficient). But maybe we can improve the 
schedule/punctuator API to allow achieving this. We'd also need to modify the 
offset management in the tasks though - this might be the trickier part.

I'm willing to take a stab at implementing this & submitting a PR - is there 
anybody who can offer guidance/ discuss the preferred approach, prior to 
implementation?

> Support Async Processing in Streams
> ---
>
> Key: KAFKA-6989
> URL: https://issues.apache.org/jira/browse/KAFKA-6989
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Today Kafka Streams use a single-thread per task architecture to achieve 
> embarrassing parallelism and good isolation. However there are a couple 
> scenarios where async processing may be preferable:
> 1) External resource access or heavy IOs with high-latency. Suppose you need 
> to access a remote REST api, read / write to an external store, or do a heavy 
> disk IO operation that may result in high latency. Current threading model 
> would block any other records before this record's done, waiting on the 
> remote call / IO to finish.
> 2) Robust failure handling with retries. Imagine the app-level processing of 
> a (non-corrupted) record fails (e.g. the user attempted to do a RPC to an 
> external system, and this call failed), and failed records are moved into a 
> separate "retry" topic. How can you process such failed records in a scalable 
> way? For example, imagine you need to implement a retry policy such as "retry 
> with exponential backoff". Here, you have the problem that 1. you can't 
> really pause processing a single record because this will pause the 
> processing of the full stream (bottleneck!) and 2. there is no 
> straight-forward way to "sort" failed records based on their "next retry 
> time" (think: priority queue).
> 3) Delayed processing. One use case is delaying re-processing (e.g. "delay 
> re-processing this event for 5 minutes") as mentioned in 2), another is for 
> implementing a scheduler: e.g. do some additional operations later based on 
> this processed record. based on Zalando Dublin, for example, are implementing 
> a distributed web crawler. Note that although this feature can be handled in 
> punctuation, it is not well aligned with our current offset committing 
> behavior, which always advance the offset once the record has been done 
> traversing the topology.
> I'm thinking of two options to support this feature:
> 1. Make the commit() mechanism more customizable to users for them to 
> implement multi-threading processing themselves: users can always do async 
> processing in the Processor API by spawning a thread-poll, e.g. but the key 
> is that the offset to be committed should be only advanced with such async 
> processing is done. This is a light-weight approach: we provide all the 
> pieces and tools, and users stack them up to build their own LEGOs.
> 2. Provide an general API to do async processing in Processor API, and take 
> care of the offsets committing internally. This is a heavy-weight approach: 
> the API may not cover all async scenarios, but it is a easy way to cover the 
> rest majority scenarios, and users do not need to worry of internal 
> implementation details such as offsets and fault tolerance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)