Hi All,

I'm using Kafka Client 10.2 with Kafka Streams.

I'm performing a groupByKey on a stream and seeing large files appear
within my state directory. Is this expected?

90M 1_0/rocksdb/content-count-store
82M 1_1/rocksdb/content-count-store
102M 1_10/rocksdb/content-count-store
86M 1_11/rocksdb/content-count-store
87M 1_12/rocksdb/content-count-store
85M 1_13/rocksdb/content-count-store
93M 1_14/rocksdb/content-count-store
87M 1_15/rocksdb/content-count-store
92M 1_16/rocksdb/content-count-store
97M 1_17/rocksdb/content-count-store
91M 1_18/rocksdb/content-count-store
94M 1_19/rocksdb/content-count-store
89M 1_2/rocksdb/content-count-store
88M 1_20/rocksdb/content-count-store
92M 1_21/rocksdb/content-count-store
83M 1_22/rocksdb/content-count-store
82M 1_23/rocksdb/content-count-store
83M 1_24/rocksdb/content-count-store
89M 1_25/rocksdb/content-count-store
91M 1_26/rocksdb/content-count-store
84M 1_27/rocksdb/content-count-store
87M 1_28/rocksdb/content-count-store
93M 1_29/rocksdb/content-count-store
88M 1_3/rocksdb/content-count-store
77M 1_30/rocksdb/content-count-store
101M 1_31/rocksdb/content-count-store
73M 1_32/rocksdb/content-count-store
89M 1_33/rocksdb/content-count-store
89M 1_34/rocksdb/content-count-store
82M 1_35/rocksdb/content-count-store
88M 1_36/rocksdb/content-count-store
82M 1_37/rocksdb/content-count-store
83M 1_38/rocksdb/content-count-store
92M 1_39/rocksdb/content-count-store
99M 1_4/rocksdb/content-count-store
88M 1_40/rocksdb/content-count-store
89M 1_41/rocksdb/content-count-store
84M 1_42/rocksdb/content-count-store
88M 1_43/rocksdb/content-count-store
91M 1_44/rocksdb/content-count-store
90M 1_45/rocksdb/content-count-store
81M 1_46/rocksdb/content-count-store
89M 1_47/rocksdb/content-count-store
81M 1_48/rocksdb/content-count-store
81M 1_49/rocksdb/content-count-store
82M 1_5/rocksdb/content-count-store
88M 1_50/rocksdb/content-count-store
88M 1_51/rocksdb/content-count-store
75M 1_52/rocksdb/content-count-store
85M 1_53/rocksdb/content-count-store
72M 1_54/rocksdb/content-count-store
89M 1_55/rocksdb/content-count-store
86M 1_56/rocksdb/content-count-store
87M 1_57/rocksdb/content-count-store
87M 1_58/rocksdb/content-count-store
94M 1_59/rocksdb/content-count-store
83M 1_6/rocksdb/content-count-store
88M 1_60/rocksdb/content-count-store
87M 1_61/rocksdb/content-count-store
102M 1_62/rocksdb/content-count-store
86M 1_63/rocksdb/content-count-store
85M 1_64/rocksdb/content-count-store
91M 1_65/rocksdb/content-count-store
86M 1_66/rocksdb/content-count-store
82M 1_67/rocksdb/content-count-store
85M 1_68/rocksdb/content-count-store
85M 1_69/rocksdb/content-count-store
87M 1_7/rocksdb/content-count-store
83M 1_70/rocksdb/content-count-store
84M 1_71/rocksdb/content-count-store
89M 1_72/rocksdb/content-count-store
82M 1_73/rocksdb/content-count-store
84M 1_74/rocksdb/content-count-store
86M 1_75/rocksdb/content-count-store
92M 1_76/rocksdb/content-count-store
85M 1_77/rocksdb/content-count-store
92M 1_78/rocksdb/content-count-store
84M 1_79/rocksdb/content-count-store
109M 1_8/rocksdb/content-count-store
99M 1_80/rocksdb/content-count-store
88M 1_81/rocksdb/content-count-store
103M 1_82/rocksdb/content-count-store
95M 1_83/rocksdb/content-count-store
89M 1_84/rocksdb/content-count-store
93M 1_85/rocksdb/content-count-store
84M 1_86/rocksdb/content-count-store
89M 1_87/rocksdb/content-count-store
95M 1_88/rocksdb/content-count-store
87M 1_89/rocksdb/content-count-store
89M 1_9/rocksdb/content-count-store
87M 1_90/rocksdb/content-count-store
100M 1_91/rocksdb/content-count-store
93M 1_92/rocksdb/content-count-store
93M 1_93/rocksdb/content-count-store
88M 1_94/rocksdb/content-count-store
88M 1_95/rocksdb/content-count-store

When the application starts up it goes through the joining process, it
takes a couple of minutes and pulls down this
8gb of data into the state directory. After that, the application starts
processing, sometimes it will give an exception
like org.apache.kafka.streams.errors.LockException: task [1_57] Failed to
lock the state directory: /data/kafka-streams/stream-processors-id/1_57

Other times it runs successfully and eventually throws a garbage collection
error:

2017-02-27 11:57:55,894 - [ERROR] - [StreamThread-3]
i.z.a.s.processors.RunProcessors - Unexpected Exception caught in thread
[StreamThread-3]:
java.lang.OutOfMemoryError: GC overhead limit exceeded
#011at java.util.Arrays.copyOf(Arrays.java:3332)
#011at
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)

#011at
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
#011at java.lang.StringBuilder.append(StringBuilder.java:136)
#011at
org.apache.kafka.common.metrics.JmxReporter.getMBeanName(JmxReporter.java:132)

#011at
org.apache.kafka.common.metrics.JmxReporter.removeAttribute(JmxReporter.java:96)

#011at
org.apache.kafka.common.metrics.JmxReporter.metricRemoval(JmxReporter.java:84)

#011at
org.apache.kafka.common.metrics.Metrics.removeMetric(Metrics.java:417)
#011at
org.apache.kafka.common.metrics.Metrics.removeSensor(Metrics.java:367)
#011at
org.apache.kafka.common.metrics.Metrics.removeSensor(Metrics.java:375)
#011at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.removeSensor(StreamsMetricsImpl.java:205)

#011at
org.apache.kafka.streams.processor.internals.ProcessorNode$NodeMetrics.removeAllSensors(ProcessorNode.java:199)

#011at
org.apache.kafka.streams.processor.internals.ProcessorNode.close(ProcessorNode.java:123)

#011at
org.apache.kafka.streams.processor.internals.StreamTask.closeTopology(StreamTask.java:355)

#011at
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:376)

#011at
org.apache.kafka.streams.processor.internals.StreamThread$5.apply(StreamThread.java:1046)

#011at
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)

#011at
org.apache.kafka.streams.processor.internals.StreamThread.closeAllTasks(StreamThread.java:1042)

#011at
org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:451)

#011at
org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:398)

#011at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:379)


I've set the following Java Options to try avoid this:

"-Xms1024m"
, "-Xmx6144m"
, "-XX:+UseConcMarkSweepGC"
, "-XX:+DisableExplicitGC"
, "-XX:+CMSClassUnloadingEnabled"

The streams are running with the following properties

streams {
  client.id = "stream-processors-id"
  application.id = "stream-processors-id"
  bootstrap.servers = "localhost:9092"
  replication.factor = "3"
  num.stream.threads = "4"
  state.dir = "/tmp/kafka-streams"
  buffered.records.per.partition = "100"
  poll.ms = "100"
  cache.max.bytes.buffering = 10485760
  commit.interval.ms = "30000"

  consumer {
    group.id = "stream-processors-id"
    auto.offset.reset = "earliest"
    max.poll.records = "100"
    session.timeout.ms = "90000"
    heartbeat.interval.ms = "40000"
    commit.interval.ms = "30000"
    max.poll.interval.ms = "300000"
    max.partition.fetch.bytes = "524288"
  }
}

Thanks,
Ian.

Reply via email to