[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data
[ https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16749171#comment-16749171 ] Del Bao commented on KAFKA-7432: A real-world scenario to back this use case: An event logging topic has 60 partitions. One of many downstream consumers is a data materialization service. say, it consumes the event message and make RPC calls to several external data sources to fetch signals related to this event in parallel, and then write to a downstream sink (data store). The processing time for this message is then about 4ms, i.e., 250 message per second. That's a total of 15k msg per second. The producing rate is way higher than this rate, which causes huge delays. One solution is to repartition the upstream topic. But in a corporate scenario, there are too many dependencies, not so easy. Micro-batch could be better. we process a batch of events and make RPC call once for these events. We can definitely handle the "micro-batch" at the application level. But better to have this API in the Kafka streams API. > API Method on Kafka Streams for processing chunks/batches of data > - > > Key: KAFKA-7432 > URL: https://issues.apache.org/jira/browse/KAFKA-7432 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: sam >Priority: Major > > For many situations in Big Data it is preferable to work with a small buffer > of records at a go, rather than one record at a time. > The natural example is calling some external API that supports batching for > efficiency. > How can we do this in Kafka Streams? I cannot find anything in the API that > looks like what I want. > So far I have: > {{builder.stream[String, String]("my-input-topic") > .mapValues(externalApiCall).to("my-output-topic")}} > What I want is: > {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = > 2000).map(externalBatchedApiCall).to("my-output-topic")}} > In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In > Spark Structured Streaming we can do > {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}. > > > https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data
[ https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722855#comment-16722855 ] sam commented on KAFKA-7432: [~Yohan123] This ticket is not really for "microbatching", perhaps you could call it "nanobatching" since unlike spark these batches are meant to be very small. Kafka itself is technically always nanobatching anyway, since you do not want to ack messages one by one - as that is very inefficient. Typically when you use the lower level KafkaConsumer API, you will process very small batches of data. I would hazard a guess (without reading the code) that this is also how Kafka Streams is implemented. > API Method on Kafka Streams for processing chunks/batches of data > - > > Key: KAFKA-7432 > URL: https://issues.apache.org/jira/browse/KAFKA-7432 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: sam >Priority: Major > > For many situations in Big Data it is preferable to work with a small buffer > of records at a go, rather than one record at a time. > The natural example is calling some external API that supports batching for > efficiency. > How can we do this in Kafka Streams? I cannot find anything in the API that > looks like what I want. > So far I have: > {{builder.stream[String, String]("my-input-topic") > .mapValues(externalApiCall).to("my-output-topic")}} > What I want is: > {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = > 2000).map(externalBatchedApiCall).to("my-output-topic")}} > In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In > Spark Structured Streaming we can do > {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}. > > > https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data
[ https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722635#comment-16722635 ] Richard Yu commented on KAFKA-7432: --- Hi, just want to point out something here. What Kafka currently supports is continuous processing, which Spark Streaming most recently implemented. In contrast, what this ticket is suggesting to implement is microbatch processing in which data is sent in batches. In some data streaming circles, continuous processing is considered the best option for sending data. Microbatching was an older technique. I don't know if we need to implement this particular option, especially since latency overall for microbatching is higher than continuous processing. > API Method on Kafka Streams for processing chunks/batches of data > - > > Key: KAFKA-7432 > URL: https://issues.apache.org/jira/browse/KAFKA-7432 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: sam >Priority: Major > > For many situations in Big Data it is preferable to work with a small buffer > of records at a go, rather than one record at a time. > The natural example is calling some external API that supports batching for > efficiency. > How can we do this in Kafka Streams? I cannot find anything in the API that > looks like what I want. > So far I have: > {{builder.stream[String, String]("my-input-topic") > .mapValues(externalApiCall).to("my-output-topic")}} > What I want is: > {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = > 2000).map(externalBatchedApiCall).to("my-output-topic")}} > In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In > Spark Structured Streaming we can do > {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}. > > > https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data
[ https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627901#comment-16627901 ] Matthias J. Sax commented on KAFKA-7432: As you point out, that you don't care about sync vs async, it might be valid to keep both tickets separated but link them only as related. Also note, that you can build sync batch calls manually: you would use a `transform()` with a state store: for each input record, you put it into the state store. If the store has enough records accumulated, you get all records from the store, and do a sync call to the external system will all records. Afterwards, you forward whatever data you want to forward, and finally, delete all records from the store. > API Method on Kafka Streams for processing chunks/batches of data > - > > Key: KAFKA-7432 > URL: https://issues.apache.org/jira/browse/KAFKA-7432 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: sam >Priority: Major > > For many situations in Big Data it is preferable to work with a small buffer > of records at a go, rather than one record at a time. > The natural example is calling some external API that supports batching for > efficiency. > How can we do this in Kafka Streams? I cannot find anything in the API that > looks like what I want. > So far I have: > {{builder.stream[String, String]("my-input-topic") > .mapValues(externalApiCall).to("my-output-topic")}} > What I want is: > {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = > 2000).map(externalBatchedApiCall).to("my-output-topic")}} > In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In > Spark Structured Streaming we can do > {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}. > > > https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data
[ https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627144#comment-16627144 ] sam commented on KAFKA-7432: [~mjsax] [~vvcephei] OK let's merge them if you think that's a good idea. I guess they both come under "performance improvements". As long as it's clear that the intent here is to be able to batch data since an *external API accepts batches of data*, and that would be more efficient than calling the API with a list of single record. Whether or not this happens async or not, was not my intention. But yes, if this could happen async also, that would be cool > API Method on Kafka Streams for processing chunks/batches of data > - > > Key: KAFKA-7432 > URL: https://issues.apache.org/jira/browse/KAFKA-7432 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: sam >Priority: Major > > For many situations in Big Data it is preferable to work with a small buffer > of records at a go, rather than one record at a time. > The natural example is calling some external API that supports batching for > efficiency. > How can we do this in Kafka Streams? I cannot find anything in the API that > looks like what I want. > So far I have: > {{builder.stream[String, String]("my-input-topic") > .mapValues(externalApiCall).to("my-output-topic")}} > What I want is: > {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = > 2000).map(externalBatchedApiCall).to("my-output-topic")}} > In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In > Spark Structured Streaming we can do > {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}. > > > https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data
[ https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626682#comment-16626682 ] Matthias J. Sax commented on KAFKA-7432: I tend to agree with John, that batching async calls (if possible), would be an implementation details (maybe exposes with some configs, if necessary). At least, we should consider this in the design discussion – if it does not fit we can still work on two independent features. > API Method on Kafka Streams for processing chunks/batches of data > - > > Key: KAFKA-7432 > URL: https://issues.apache.org/jira/browse/KAFKA-7432 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: sam >Priority: Major > > For many situations in Big Data it is preferable to work with a small buffer > of records at a go, rather than one record at a time. > The natural example is calling some external API that supports batching for > efficiency. > How can we do this in Kafka Streams? I cannot find anything in the API that > looks like what I want. > So far I have: > {{builder.stream[String, String]("my-input-topic") > .mapValues(externalApiCall).to("my-output-topic")}} > What I want is: > {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = > 2000).map(externalBatchedApiCall).to("my-output-topic")}} > In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In > Spark Structured Streaming we can do > {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}. > > > https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data
[ https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625919#comment-16625919 ] John Roesler commented on KAFKA-7432: - Hi [~sams], This sounds like a valuable optimization. I agree that it's not the same request, but it seems like we'll get the best final result by considering this and KAFKA-6989 together in one design. Would it be ok with you to move this request into KAFKA-6989 for that reason ? Then, we could be sure that any KIP coming out would consider both batching and async. In case you're looking for an immediate solution, I'd recommend using the Processor API to define a custom Processor that uses a state store to save up records until you get the desired number, and then `context.forward` them as a list. After the map, you could have a reciprocal processor to turn the list back into individual records. I think the implementation of your feature would look something like this in practice, so you could also contribute valuable experience to the KIP discussion. Thanks, -John > API Method on Kafka Streams for processing chunks/batches of data > - > > Key: KAFKA-7432 > URL: https://issues.apache.org/jira/browse/KAFKA-7432 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: sam >Priority: Major > > For many situations in Big Data it is preferable to work with a small buffer > of records at a go, rather than one record at a time. > The natural example is calling some external API that supports batching for > efficiency. > How can we do this in Kafka Streams? I cannot find anything in the API that > looks like what I want. > So far I have: > {{builder.stream[String, String]("my-input-topic") > .mapValues(externalApiCall).to("my-output-topic")}} > What I want is: > {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = > 2000).map(externalBatchedApiCall).to("my-output-topic")}} > In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In > Spark Structured Streaming we can do > {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}. > > > https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data
[ https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625446#comment-16625446 ] sam commented on KAFKA-7432: [~mjsax] I see the similarity in use case, but KAFKA-6988 seems to be requesting something like this: {code:java} builder.stream[String, String]("in").async(numThreads = 10).map(someCallThatIsSlowButNotCPUIntensive).to("out"){code} but what we have is an external call that expects a batch of records, e.g. a signature like this: {code:java} def externalBatchedApiCall(it: List[A]): List[B]{code} > API Method on Kafka Streams for processing chunks/batches of data > - > > Key: KAFKA-7432 > URL: https://issues.apache.org/jira/browse/KAFKA-7432 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: sam >Priority: Major > > For many situations in Big Data it is preferable to work with a small buffer > of records at a go, rather than one record at a time. > The natural example is calling some external API that supports batching for > efficiency. > How can we do this in Kafka Streams? I cannot find anything in the API that > looks like what I want. > So far I have: > {{builder.stream[String, String]("my-input-topic") > .mapValues(externalApiCall).to("my-output-topic")}} > What I want is: > {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = > 2000).map(externalBatchedApiCall).to("my-output-topic")}} > In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In > Spark Structured Streaming we can do > {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}. > > > https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data
[ https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625258#comment-16625258 ] Matthias J. Sax commented on KAFKA-7432: This sounds similar to KAFKA-6989 – can you elaborate what the difference is between both ticket? If there is none, we could close this one as duplicate and track the request via KAFKA-6989. > API Method on Kafka Streams for processing chunks/batches of data > - > > Key: KAFKA-7432 > URL: https://issues.apache.org/jira/browse/KAFKA-7432 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: sam >Priority: Major > > For many situations in Big Data it is preferable to work with a small buffer > of records at a go, rather than one record at a time. > The natural example is calling some external API that supports batching for > efficiency. > How can we do this in Kafka Streams? I cannot find anything in the API that > looks like what I want. > So far I have: > {{builder.stream[String, String]("my-input-topic") > .mapValues(externalApiCall).to("my-output-topic")}} > What I want is: > {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = > 2000).map(externalBatchedApiCall).to("my-output-topic")}} > In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In > Spark Structured Streaming we can do > {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}. > > > https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)