Hi All,
We recently migrated from flink to kafka streams in production and we are
facing a major issue. Any quick help would really be appreciated.
There are 72 input data topic partitions and 72 control stream topic
partitions. There is a minimum of 12 nodes with 6 streams threads on each
instance and we are using auto scaling based on CPU load. Also we do have
scenarios where the instances go down and it's replaced by a new instance.
Now the problem that we see is unequal partition allocation among
instances. For example one node has 3 data partitions allocated per stream
thread and the CPU on that node is about 80% whereas there is another node
in which only 4 stream threads have allocations and they are assigned with
one partition each.
Is there a way to equally distribute the partitions so that there will not
be a problem in processing the incoming data without much lag. In this case
some partitions have very high lag versus some in a few thousands. This is
impacting our production system.
Streams Configuration:
acceptable.recovery.lag = 10000
application.id = prod-v1
application.server = *.*.*.*:80
bootstrap.servers = [*]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 104857600
client.id =
commit.interval.ms = 10000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.key.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class
org.apache.kafka.streams.processor.WallclockTimestampExtractor
default.value.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.windowed.key.serde.inner = null
default.windowed.value.serde.inner = null
max.task.idle.ms = 0
max.warmup.replicas = 2
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 1
num.stream.threads = 6
partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
probing.rebalance.interval.ms = 600000
processing.guarantee = at_least_once
receive.buffer.bytes = 52428800
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 2
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /mnt/state
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000
Thanks,
Navneeth