[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data

2019-01-22 Thread Del Bao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16749171#comment-16749171
 ] 

Del Bao commented on KAFKA-7432:


A real-world scenario to back this use case: 

An event logging topic has 60 partitions. One of many downstream consumers is a 
data materialization service. say, it consumes the event message and make RPC 
calls to several external data sources to fetch signals related to this event 
in parallel, and then write to a downstream sink (data store). The processing 
time for this message is then about 4ms, i.e., 250 message per second. That's a 
total of 15k msg per second. The producing rate is way higher than this rate, 
which causes huge delays.

One solution is to repartition the upstream topic. But in a corporate scenario, 
there are too many dependencies, not so easy. Micro-batch could be better. we 
process a batch of events and make RPC call once for these events.

We can definitely handle the "micro-batch" at the application level. But better 
to have this API in the Kafka streams API.

> API Method on Kafka Streams for processing chunks/batches of data
> -
>
> Key: KAFKA-7432
> URL: https://issues.apache.org/jira/browse/KAFKA-7432
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: sam
>Priority: Major
>
> For many situations in Big Data it is preferable to work with a small buffer 
> of records at a go, rather than one record at a time.
> The natural example is calling some external API that supports batching for 
> efficiency.
> How can we do this in Kafka Streams? I cannot find anything in the API that 
> looks like what I want.
> So far I have:
> {{builder.stream[String, String]("my-input-topic") 
> .mapValues(externalApiCall).to("my-output-topic")}}
> What I want is:
> {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 
> 2000).map(externalBatchedApiCall).to("my-output-topic")}}
> In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In 
> Spark Structured Streaming we can do 
> {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
>  
>  
> https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data

2018-12-17 Thread sam (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722855#comment-16722855
 ] 

sam commented on KAFKA-7432:


[~Yohan123] This ticket is not really for "microbatching", perhaps you could 
call it "nanobatching" since unlike spark these batches are meant to be very 
small.

Kafka itself is technically always nanobatching anyway, since you do not want 
to ack messages one by one - as that is very inefficient.  Typically when you 
use the lower level KafkaConsumer API, you will process very small batches of 
data.  I would hazard a guess (without reading the code) that this is also how 
Kafka Streams is implemented.

> API Method on Kafka Streams for processing chunks/batches of data
> -
>
> Key: KAFKA-7432
> URL: https://issues.apache.org/jira/browse/KAFKA-7432
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: sam
>Priority: Major
>
> For many situations in Big Data it is preferable to work with a small buffer 
> of records at a go, rather than one record at a time.
> The natural example is calling some external API that supports batching for 
> efficiency.
> How can we do this in Kafka Streams? I cannot find anything in the API that 
> looks like what I want.
> So far I have:
> {{builder.stream[String, String]("my-input-topic") 
> .mapValues(externalApiCall).to("my-output-topic")}}
> What I want is:
> {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 
> 2000).map(externalBatchedApiCall).to("my-output-topic")}}
> In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In 
> Spark Structured Streaming we can do 
> {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
>  
>  
> https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data

2018-12-16 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722635#comment-16722635
 ] 

Richard Yu commented on KAFKA-7432:
---

Hi, just want to point out something here.

What Kafka currently supports is continuous processing, which Spark Streaming 
most recently implemented. In contrast, what this ticket is suggesting to 
implement is microbatch processing in which data is sent in batches.  In some 
data streaming circles, continuous processing is considered the best option for 
sending data. Microbatching was an older technique. 

I don't know if we need to implement this particular option, especially since 
latency overall for microbatching is higher than continuous processing.

> API Method on Kafka Streams for processing chunks/batches of data
> -
>
> Key: KAFKA-7432
> URL: https://issues.apache.org/jira/browse/KAFKA-7432
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: sam
>Priority: Major
>
> For many situations in Big Data it is preferable to work with a small buffer 
> of records at a go, rather than one record at a time.
> The natural example is calling some external API that supports batching for 
> efficiency.
> How can we do this in Kafka Streams? I cannot find anything in the API that 
> looks like what I want.
> So far I have:
> {{builder.stream[String, String]("my-input-topic") 
> .mapValues(externalApiCall).to("my-output-topic")}}
> What I want is:
> {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 
> 2000).map(externalBatchedApiCall).to("my-output-topic")}}
> In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In 
> Spark Structured Streaming we can do 
> {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
>  
>  
> https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data

2018-09-25 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627901#comment-16627901
 ] 

Matthias J. Sax commented on KAFKA-7432:


As you point out, that you don't care about sync vs async, it might be valid to 
keep both tickets separated but link them only as related. Also note, that you 
can build sync batch calls manually: you would use a `transform()` with a state 
store: for each input record, you put it into the state store. If the store has 
enough records accumulated, you get all records from the store, and do a sync 
call to the external system will all records. Afterwards, you forward whatever 
data you want to forward, and finally, delete all records from the store.

> API Method on Kafka Streams for processing chunks/batches of data
> -
>
> Key: KAFKA-7432
> URL: https://issues.apache.org/jira/browse/KAFKA-7432
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: sam
>Priority: Major
>
> For many situations in Big Data it is preferable to work with a small buffer 
> of records at a go, rather than one record at a time.
> The natural example is calling some external API that supports batching for 
> efficiency.
> How can we do this in Kafka Streams? I cannot find anything in the API that 
> looks like what I want.
> So far I have:
> {{builder.stream[String, String]("my-input-topic") 
> .mapValues(externalApiCall).to("my-output-topic")}}
> What I want is:
> {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 
> 2000).map(externalBatchedApiCall).to("my-output-topic")}}
> In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In 
> Spark Structured Streaming we can do 
> {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
>  
>  
> https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data

2018-09-25 Thread sam (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627144#comment-16627144
 ] 

sam commented on KAFKA-7432:


[~mjsax] [~vvcephei] OK let's merge them if you think that's a good idea.  I 
guess they both come under "performance improvements".

As long as it's clear that the intent here is to be able to batch data since an 
*external API accepts batches of data*, and that would be more efficient than 
calling the API with a list of single record.  Whether or not this happens 
async or not, was not my intention.  But yes, if this could happen async also, 
that would be cool

> API Method on Kafka Streams for processing chunks/batches of data
> -
>
> Key: KAFKA-7432
> URL: https://issues.apache.org/jira/browse/KAFKA-7432
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: sam
>Priority: Major
>
> For many situations in Big Data it is preferable to work with a small buffer 
> of records at a go, rather than one record at a time.
> The natural example is calling some external API that supports batching for 
> efficiency.
> How can we do this in Kafka Streams? I cannot find anything in the API that 
> looks like what I want.
> So far I have:
> {{builder.stream[String, String]("my-input-topic") 
> .mapValues(externalApiCall).to("my-output-topic")}}
> What I want is:
> {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 
> 2000).map(externalBatchedApiCall).to("my-output-topic")}}
> In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In 
> Spark Structured Streaming we can do 
> {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
>  
>  
> https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data

2018-09-24 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626682#comment-16626682
 ] 

Matthias J. Sax commented on KAFKA-7432:


I tend to agree with John, that batching async calls (if possible), would be an 
implementation details (maybe exposes with some configs, if necessary). At 
least, we should consider this in the design discussion – if it does not fit we 
can still work on two independent features.

> API Method on Kafka Streams for processing chunks/batches of data
> -
>
> Key: KAFKA-7432
> URL: https://issues.apache.org/jira/browse/KAFKA-7432
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: sam
>Priority: Major
>
> For many situations in Big Data it is preferable to work with a small buffer 
> of records at a go, rather than one record at a time.
> The natural example is calling some external API that supports batching for 
> efficiency.
> How can we do this in Kafka Streams? I cannot find anything in the API that 
> looks like what I want.
> So far I have:
> {{builder.stream[String, String]("my-input-topic") 
> .mapValues(externalApiCall).to("my-output-topic")}}
> What I want is:
> {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 
> 2000).map(externalBatchedApiCall).to("my-output-topic")}}
> In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In 
> Spark Structured Streaming we can do 
> {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
>  
>  
> https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data

2018-09-24 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625919#comment-16625919
 ] 

John Roesler commented on KAFKA-7432:
-

Hi [~sams],

This sounds like a valuable optimization. 

I agree that it's not the same request, but it seems like we'll get the best 
final result by considering this and KAFKA-6989 together in one design. Would 
it be ok with you to move this request into KAFKA-6989 for that reason ? Then, 
we could be sure that any KIP coming out would consider both batching and async.

 

In case you're looking for an immediate solution, I'd recommend using the 
Processor API to define a custom Processor that uses a state store to save up 
records until you get the desired number, and then `context.forward` them as a 
list. After the map, you could have a reciprocal processor to turn the list 
back into individual records. I think the implementation of your feature would 
look something like this in practice, so you could also contribute valuable 
experience to the KIP discussion.

Thanks,

-John

> API Method on Kafka Streams for processing chunks/batches of data
> -
>
> Key: KAFKA-7432
> URL: https://issues.apache.org/jira/browse/KAFKA-7432
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: sam
>Priority: Major
>
> For many situations in Big Data it is preferable to work with a small buffer 
> of records at a go, rather than one record at a time.
> The natural example is calling some external API that supports batching for 
> efficiency.
> How can we do this in Kafka Streams? I cannot find anything in the API that 
> looks like what I want.
> So far I have:
> {{builder.stream[String, String]("my-input-topic") 
> .mapValues(externalApiCall).to("my-output-topic")}}
> What I want is:
> {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 
> 2000).map(externalBatchedApiCall).to("my-output-topic")}}
> In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In 
> Spark Structured Streaming we can do 
> {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
>  
>  
> https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data

2018-09-24 Thread sam (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625446#comment-16625446
 ] 

sam commented on KAFKA-7432:


[~mjsax] I see the similarity in use case, but KAFKA-6988 seems to be 
requesting something like this:
{code:java}
builder.stream[String, String]("in").async(numThreads = 
10).map(someCallThatIsSlowButNotCPUIntensive).to("out"){code}
but what we have is an external call that expects a batch of records, e.g. a 
signature like this:
{code:java}
def externalBatchedApiCall(it: List[A]): List[B]{code}

> API Method on Kafka Streams for processing chunks/batches of data
> -
>
> Key: KAFKA-7432
> URL: https://issues.apache.org/jira/browse/KAFKA-7432
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: sam
>Priority: Major
>
> For many situations in Big Data it is preferable to work with a small buffer 
> of records at a go, rather than one record at a time.
> The natural example is calling some external API that supports batching for 
> efficiency.
> How can we do this in Kafka Streams? I cannot find anything in the API that 
> looks like what I want.
> So far I have:
> {{builder.stream[String, String]("my-input-topic") 
> .mapValues(externalApiCall).to("my-output-topic")}}
> What I want is:
> {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 
> 2000).map(externalBatchedApiCall).to("my-output-topic")}}
> In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In 
> Spark Structured Streaming we can do 
> {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
>  
>  
> https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data

2018-09-23 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625258#comment-16625258
 ] 

Matthias J. Sax commented on KAFKA-7432:


This sounds similar to KAFKA-6989 – can you elaborate what the difference is 
between both ticket? If there is none, we could close this one as duplicate and 
track the request via KAFKA-6989.

> API Method on Kafka Streams for processing chunks/batches of data
> -
>
> Key: KAFKA-7432
> URL: https://issues.apache.org/jira/browse/KAFKA-7432
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: sam
>Priority: Major
>
> For many situations in Big Data it is preferable to work with a small buffer 
> of records at a go, rather than one record at a time.
> The natural example is calling some external API that supports batching for 
> efficiency.
> How can we do this in Kafka Streams? I cannot find anything in the API that 
> looks like what I want.
> So far I have:
> {{builder.stream[String, String]("my-input-topic") 
> .mapValues(externalApiCall).to("my-output-topic")}}
> What I want is:
> {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 
> 2000).map(externalBatchedApiCall).to("my-output-topic")}}
> In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In 
> Spark Structured Streaming we can do 
> {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
>  
>  
> https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)