Hi, I need assistance in the below scenario. Please help me with this.
I am using the hopping time window in Kafka streams with *suppress*() I am seeing the following memory Errors. *1. Facing the memory issue when the Kafka application is running continuously* for 2 to 3 days of deployment without any restart on the machine *Exception in thread "change_detection_stream-08bd427d-36fd-467a-8923-4f7bb67aa949-StreamThread-2" java.lang.OutOfMemoryError: Java heap space at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249) at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$385/0x0000000800bc7440.restoreBatch(Unknown Source) at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) at org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)* we are having the following Specifications in the machine: RAM: 16GB *2. /tmp Folder is filled with more memory also.* *Kafka Version:* *2.1.0* *I am adding the POC code below* *// define the time window as a hopping time windowTimeWindows timeWindow = TimeWindows.of(Duration.ofHours(4)).advanceBy(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1));KTable<Windowed<String>, MetricsTimeSeries> windowedMetricsTimeSeriesStream = builder.stream("metrics_ip", Consumed.with(Serdes.String(), new JSONSerde<>())).groupByKey().windowedBy(timeWindow).aggregate(() -> new MetricsTimeSeries(), /* initializer */ * //*MetricsTimeSeries* is the aggregator class *(aggKey, newValue, aggValue) -> {aggValue.addDataPoint(newValue);return aggValue;}, /* adder */Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /* state store name */.suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(<1GB>).shutDownWhenFull()));windowedMetricsTimeSeriesStream.toStream().map((key, value) -> //mapping logic goes here ).foreach(//logic to validate and save);* *Properties set to Kafka Streams:* *StreamsConfig.APPLICATION_ID_CONFIG - "streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG - "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG - Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS - Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG - JSONSerde.class StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS - JSONSerde.class* *StreamsConfig.NUM_STREAM_THREADS_CONFIG - 2* *StreamsConfig.PROCESSING_GUARANTEE_CONFIG - StreamsConfig.EXACTLY_ONCEStreamsConfig.COMMIT_INTERVAL_MS_CONFIG - 1000ms* Thanks in Advance. Kalyani Y, 9177982636