Hi,
I need assistance in the below scenario. Please help me with this.
I am using the hopping time window in Kafka streams with *suppress*() I am
seeing the following memory Errors.
*1. Facing the memory issue when the Kafka application is running
continuously* for 2 to 3 days of deployment without any restart on the
machine
*Exception in thread
"change_detection_stream-08bd427d-36fd-467a-8923-4f7bb67aa949-StreamThread-2"
java.lang.OutOfMemoryError: Java heap space at
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249)
at
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$385/0x0000000800bc7440.restoreBatch(Unknown
Source) at
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
at
org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)
at
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)*
we are having the following Specifications in the machine:
RAM: 16GB
*2. /tmp Folder is filled with more memory also.*
*Kafka Version:* *2.1.0*
*I am adding the POC code below*
*// define the time window as a hopping time windowTimeWindows timeWindow =
TimeWindows.of(Duration.ofHours(4)).advanceBy(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1));KTable<Windowed<String>,
MetricsTimeSeries> windowedMetricsTimeSeriesStream =
builder.stream("metrics_ip", Consumed.with(Serdes.String(), new
JSONSerde<>())).groupByKey().windowedBy(timeWindow).aggregate(() -> new
MetricsTimeSeries(), /* initializer */ * //*MetricsTimeSeries* is the
aggregator class
*(aggKey, newValue, aggValue) -> {aggValue.addDataPoint(newValue);return
aggValue;}, /* adder
*/Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /*
state store name
*/.suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(<1GB>).shutDownWhenFull()));windowedMetricsTimeSeriesStream.toStream().map((key,
value) -> //mapping logic goes here ).foreach(//logic to validate and
save);*
*Properties set to Kafka Streams:*
*StreamsConfig.APPLICATION_ID_CONFIG -
"streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
- "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG -
Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
- Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
- JSONSerde.class StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS -
JSONSerde.class*
*StreamsConfig.NUM_STREAM_THREADS_CONFIG - 2*
*StreamsConfig.PROCESSING_GUARANTEE_CONFIG -
StreamsConfig.EXACTLY_ONCEStreamsConfig.COMMIT_INTERVAL_MS_CONFIG - 1000ms*
Thanks in Advance.
Kalyani Y,
9177982636