Hi,

I need assistance in the below scenario. Please help me with this.

I am using the hopping time window in Kafka streams with *suppress*() I am
seeing the following memory Errors.

*1. Facing the memory issue when the Kafka application is running
continuously* for 2 to 3 days of deployment without any restart on the
machine












*Exception in thread
"change_detection_stream-08bd427d-36fd-467a-8923-4f7bb67aa949-StreamThread-2"
java.lang.OutOfMemoryError: Java heap space        at
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:249)
      at
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$385/0x0000000800bc7440.restoreBatch(Unknown
Source)        at
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
      at
org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:83)
      at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:310)
      at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)
      at
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
      at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
      at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
      at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)*


we are having the following Specifications in the machine:
RAM: 16GB

*2.   /tmp Folder is filled with more memory also.*


*Kafka Version:* *2.1.0*

*I am adding the POC code below*









*// define the time window as a hopping time windowTimeWindows timeWindow =
TimeWindows.of(Duration.ofHours(4)).advanceBy(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1));KTable<Windowed<String>,
MetricsTimeSeries> windowedMetricsTimeSeriesStream =
builder.stream("metrics_ip", Consumed.with(Serdes.String(), new
JSONSerde<>())).groupByKey().windowedBy(timeWindow).aggregate(() -> new
MetricsTimeSeries(), /* initializer */ * //*MetricsTimeSeries*  is the
aggregator class










*(aggKey, newValue, aggValue) -> {aggValue.addDataPoint(newValue);return
aggValue;}, /* adder
*/Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /*
state store name
*/.suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(<1GB>).shutDownWhenFull()));windowedMetricsTimeSeriesStream.toStream().map((key,
value) -> //mapping logic goes here ).foreach(//logic to validate and
save);*

*Properties set to Kafka Streams:*





*StreamsConfig.APPLICATION_ID_CONFIG -
"streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
- "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG -
Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
- Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
- JSONSerde.class StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS -
JSONSerde.class*

*StreamsConfig.NUM_STREAM_THREADS_CONFIG -  2*


*StreamsConfig.PROCESSING_GUARANTEE_CONFIG -
StreamsConfig.EXACTLY_ONCEStreamsConfig.COMMIT_INTERVAL_MS_CONFIG - 1000ms*



Thanks in Advance.

Kalyani Y,
9177982636

Reply via email to