+dev <[email protected]> Hi Shrikant, If you look into the expansion of BoundedReadfromUnboundedSource[1], you will notice that it will expand into Create single shard -> Split into multiple shard -> read from one shard. The number of records from one shard will not be larger than 10000 and the number of shards will not be larger than 100[2].
Back to your questions, the OutputReceiver will output the received element to the downstream operation immediately. It will not keep the record in batch in memory. But you are right that BoundedReadFromUnboundedSource is memory-sensitive especially if your records or your downstream operations consume a lot. [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L93-L121 [2] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L156-L161 On Mon, Jan 11, 2021 at 5:48 AM shrikant bang <[email protected]> wrote: > Hi Team, > > I have ETL use cases with source as Kafka ( in *Batch* mode) > with SparkRunner. I am trying to understand the internals' of > KafkaIO.Read. > > Can someone please confirm if my understanding is correct? > > - WindowedContextOutputReceiver is getting used for collecting Kafka > records from KafkaIO.Read from BoundedReadFromUnboundedSource [1]. > - All read Kafka records get stored in memory and gets spilled to > downstream once the loop ends in Read function [2]. > > Ref : > [1] : > https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L206 > > [2] : > https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L201 > > > > Thank You, > Shrikant Bang. > > On Sun, Jan 10, 2021 at 11:43 PM shrikant bang < > [email protected]> wrote: > >> Hi Team, >> >> I have below questions/ understandings for KafkaIO.Read in batch mode : >> >> 1. I built an understanding on debugging that, KafkaIO converts >> unbounded stream into bounded read and *buffers all records* till >> either of criteria matches - max records/ max time to read. >> If this understanding is correct, then read is memory intensive as >> KafkaIO has to buffer all read records before passing to down-streams. Is >> my understanding correct? >> >> 2. If #1 is correct, then is there any way we can keep writing >> records instead of buffering into memory in KafkaIO read operation (in >> batch mode) ? >> >> >> Thank You, >> Shrikant Bang >> >
