sam created KAFKA-7470: -------------------------- Summary: Thread safe accumulator across all instances Key: KAFKA-7470 URL: https://issues.apache.org/jira/browse/KAFKA-7470 Project: Kafka Issue Type: New Feature Components: streams Reporter: sam
For many situations in Big Data it is preferable to work with a small buffer of records at a go, rather than one record at a time. The natural example is calling some external API that supports batching for efficiency. How can we do this in Kafka Streams? I cannot find anything in the API that looks like what I want. So far I have: {{builder.stream[String, String]("my-input-topic") .mapValues(externalApiCall).to("my-output-topic")}} What I want is: {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")}} In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In Spark Structured Streaming we can do {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}. https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)