Re: Kafka Streams application stuck rebalancing on startup

2020-10-27 Thread Sophie Blee-Goldman
>
> We've been able to get the crucial factors that cause this behavior down to
> a particular combination

What do you mean by this -- that you only see this when all four of those
operators
are at play? Or do you see it with any of them.

I guess the first thing to narrow down is whether it's actually rebalancing
or just
restoring within this time (the REBALANCING state is somewhat
misleadingly-named).
If this is a completely new app then it's probably not restoring, but if
this app had
already been running and building up state before hitting this issue then
that's probably
the reason. It's not at all uncommon for restoration to take more than 30
seconds.

If it really is rebalancing this entire time, then you need to look into
the logs to figure
out why. I don't see anything obviously wrong with your particular
application, and even
if there was it should never result in endless rebalances like this. How
many instances
of the application are you running?

Cheers,
Sophie

On Thu, Oct 15, 2020 at 10:01 PM Alex Jablonski 
wrote:

> Hey there!
>
> My team and I have run across a bit of a jam in our application where,
> given a particular setup, our Kafka Streams application never seems to
> start successfully, instead just getting stuck in the REBALANCING state.
> We've been able to get the crucial factors that cause this behavior down to
> a particular combination of (1) grouping, (2) windowing, (3) aggregating,
> and (4) foreign-key joining, with some of those steps specifying Serdes
> besides the default.
>
> It's probably more useful to see a minimal example, so there's one here
>  >.
> The underlying Kafka Streams version is 2.5.1. The first test should show
> the application eventually transition to running state, but it doesn't
> within the 30 second timeout I've set. Interestingly, getting rid of the
> 'Grouped.with' argument to the 'groupBy' function and the
> 'Materialized.with' in 'aggregate' in the 'StreamsConfiguration' lets the
> application transition to "RUNNING", though without the correct Serdes
> that's not too valuable.
>
> There might be a cleaner way to organize the particular flow in the toy
> example, but is there something fundamentally wrong with the approach laid
> out in that application that would cause Streams to be stuck in
> REBALANCING? I'd appreciate any advice folks could give!
>
> Thanks!
> Alex Jablonski
>


Re: Kafka Streams RocksDB CPU usage

2020-10-27 Thread Sophie Blee-Goldman
You might want to start with a lower commit interval, if you can handle some
additional latency. I would bet that the frequent flushing is a major part
of your
problem: not just the act of flushing itself, but the consequences for the
structure
of the data in each rocksdb. If you end up flushing unfilled memtables then
you'll
end up with a large number of small L0 files that then have to be
compacted, and
until they are this can make the iterators/seeks less effective. Also it
means the
memtable is less effective as a write cache so you miss out on some
immediate
deduplication of updates to the same key.

There's been some recent work to decouple flushing from committing, so
starting
in 2.7 you shouldn't have to choose between low latency and cache/rocksdb
performance. This release is currently in progress but I'd recommend
checking
it out when you can.

I'm not sure what version you're using but in 2.5 we added some RocksDB
metrics
that could be useful for further insight. I think they're all recorded at
the DEBUG
level. Might be worth investigating.

We also recently added some additional metrics to expose properties of
RocksDB,
 which will also be available in the upcoming 2.7 release.

Cheers,
Sophie

On Tue, Oct 27, 2020 at 1:49 PM Giselle van Dongen <
giselle.vandon...@ugent.be> wrote:

> Hi all,
>
>
> We have a Kafka Streams job which has high CPU utilization. When profiling
> the job, we saw that this was for a large part due to RocksDB methods:
> flush, seek, put, get, iteratorCF. We use the default settings for our
> RocksDB state store. Which configuration parameters are most important to
> tune to lower CPU usage? Most documentation focuses on memory as the
> bottleneck.
>
>
> Our job does a join and window step. The commit interval is 1 second. We
> enabled caching and the cache is 512MB large. We have 6 instances of 6 CPU
> and 30 GB RAM.
>
>
>
> Thank you for any help!
>
>


Kafka Streams RocksDB high CPU usage

2020-10-27 Thread Giselle Van Dongen
Hi all,

We have a Kafka Streams application which is showing high CPU usage. When 
profiling the application we see that many of the hotspots are related to 
RocksDB: flush, seek0, put iteratorCF and get methods. 

We are using the default configuration for RocksDB. We read the documentation 
but would like to ask some pointers in case CPU is the bottleneck. 

We do a join and window step in our code. Commit interval is 1 second and we 
have caching enabled and the max cache size is 512MB. We have 5 instances (6 
CPU, 30 GB RAM).

Thank you!




Kafka Streams RocksDB CPU usage

2020-10-27 Thread Giselle van Dongen
Hi all,


We have a Kafka Streams job which has high CPU utilization. When profiling the 
job, we saw that this was for a large part due to RocksDB methods: flush, seek, 
put, get, iteratorCF. We use the default settings for our RocksDB state store. 
Which configuration parameters are most important to tune to lower CPU usage? 
Most documentation focuses on memory as the bottleneck.


Our job does a join and window step. The commit interval is 1 second. We 
enabled caching and the cache is 512MB large. We have 6 instances of 6 CPU and 
30 GB RAM.



Thank you for any help!



Re: Client session timed out

2020-10-27 Thread Sabina Marx
Does anyone have any idea what we can do?

All Zookeepers(3) and Kafkas are running. (5 nodes
meaning 5 physical hosts). Then I reboot one physical
host. I still have the redundancy. But when the physical host comes up and
zookeeper and then Kafka come up, I have Kafka timing out and not
connecting to the existing Kafka cluster.

Log:
Started Apache Kafka.
INFO Registered kafka:type=kafka.Log4jController MBean 
(kafka.utils.Log4jControllerRegistration$)
INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable 
client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
INFO Registered signal handlers for TERM, INT, HUP 
(org.apache.kafka.common.utils.LoggingSignalHandler)
INFO starting (kafka.server.KafkaServer)
INFO Connecting to zookeeper on X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181 
(kafka.server.KafkaServer)
INFO [ZooKeeperClient Kafka server] Initializing a new session to 
X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181. (kafka.zookeeper.ZooKeeperClient)
INFO Client 
environment:zookeeper.version=3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, 
built on 05/04/2020 15:53 GMT (org.apache.zookeeper.ZooKeeper)
INFO Client environment:host.name=Kafka03.X.X (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.version=11.0.8 (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.vendor=Debian (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.home=/usr/lib/jvm/java-11-openjdk-amd64 
(org.apache.zookeeper.ZooKeeper)
INFO Client 
environment:java.class.path=/opt/kafka/bin/../libs/activation-1.1.1.jar:/opt/kafka/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/kafka/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/bin/../libs/audience-annotations-0.5.0.j
INFO Client 
environment:java.library.path=/usr/java/packages/lib:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
 (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.compiler= (org.apache.zookeeper.ZooKeeper)
INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
INFO Client environment:os.version=4.19.0-10-amd64 
(org.apache.zookeeper.ZooKeeper)
INFO Client environment:user.name=it (org.apache.zookeeper.ZooKeeper)
INFO Client environment:user.home=/home/it (org.apache.zookeeper.ZooKeeper)
INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
INFO Client environment:os.memory.free=980MB (org.apache.zookeeper.ZooKeeper)
INFO Client environment:os.memory.max=1024MB (org.apache.zookeeper.ZooKeeper)
INFO Client environment:os.memory.total=1024MB (org.apache.zookeeper.ZooKeeper)
INFO Initiating client connection, 
connectString=X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181 sessionTimeout=18000 
watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@48b67364 
(org.apache.zookeeper.ZooKeeper)
INFO jute.maxbuffer value is 4194304 Bytes 
(org.apache.zookeeper.ClientCnxnSocket)
INFO zookeeper.request.timeout value is 0. feature enabled= 
(org.apache.zookeeper.ClientCnxn)
INFO [ZooKeeperClient Kafka server] Waiting until connected. 
(kafka.zookeeper.ZooKeeperClient)
INFO Opening socket connection to server kafka01.X.X/X.X.X.X:2181. Will not 
attempt to authenticate using SASL (unknown error) 
(org.apache.zookeeper.ClientCnxn)
INFO Socket connection established, initiating session, client: /X.X.X.X:45952, 
server: kafka01.X.X/X.X.X.X:2181 (org.apache.zookeeper.ClientCnxn)
WARN Client session timed out, have not heard from server in 6003ms for 
sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
INFO Client session timed out, have not heard from server in 6003ms for 
sessionid 0x0, closing socket connection and attempting reconnect 
(org.apache.zookeeper.ClientCnxn)
INFO Opening socket connection to server kafka05.X.X/X.X.X.X:2181. Will not 
attempt to authenticate using SASL (unknown error) 
(org.apache.zookeeper.ClientCnxn)
INFO Socket connection established, initiating session, client: /X.X.X.X:51582, 
server: kafka05.X.X/X.X.X.X:2181 (org.apache.zookeeper.ClientCnxn)
WARN Client session timed out, have not heard from server in 6003ms for 
sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
INFO Client session timed out, have not heard from server in 6003ms for 
sessionid 0x0, closing socket connection and attempting reconnect 
(org.apache.zookeeper.ClientCnxn)
INFO Opening socket connection to server X.X.X.X/X.X.X.X:2181. Will not attempt 
to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
INFO Socket connection established, initiating session, client: /X.X.X.X:44992, 
server: X.X.X.X/X.X.X.X:2181 (org.apache.zookeeper.ClientCnxn)
INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
WARN Client session timed out, have not heard from server in 6001ms for 
sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
INFO EventThread shut down for session: 

Re: multi-threaded consumer configuration like stream threads?

2020-10-27 Thread Liam Clarke-Hutchinson
Hi Pushkar,

No. You'd need to combine a consumer with a thread pool or similar as you
prefer. As the docs say (from
https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
)

We have intentionally avoided implementing a particular threading model for
> processing. This leaves several options for implementing multi-threaded
> processing of records.
> 1. One Consumer Per Thread
> A simple option is to give each thread its own consumer instance. Here are
> the pros and cons of this approach:
>
>- *PRO*: It is the easiest to implement
>
>
>- *PRO*: It is often the fastest as no inter-thread co-ordination is
>needed
>
>
>- *PRO*: It makes in-order processing on a per-partition basis very
>easy to implement (each thread just processes messages in the order it
>receives them).
>
>
>- *CON*: More consumers means more TCP connections to the cluster (one
>per thread). In general Kafka handles connections very efficiently so this
>is generally a small cost.
>
>
>- *CON*: Multiple consumers means more requests being sent to the
>server and slightly less batching of data which can cause some drop in I/O
>throughput.
>
>
>- *CON*: The number of total threads across all processes will be
>limited by the total number of partitions.
>
> 2. Decouple Consumption and Processing
> Another alternative is to have one or more consumer threads that do all
> data consumption and hands off ConsumerRecords
> 
>  instances
> to a blocking queue consumed by a pool of processor threads that actually
> handle the record processing. This option likewise has pros and cons:
>
>- *PRO*: This option allows independently scaling the number of
>consumers and processors. This makes it possible to have a single consumer
>that feeds many processor threads, avoiding any limitation on partitions.
>
>
>- *CON*: Guaranteeing order across the processors requires particular
>care as the threads will execute independently an earlier chunk of data may
>actually be processed after a later chunk of data just due to the luck of
>thread execution timing. For processing that has no ordering requirements
>this is not a problem.
>
>
>- *CON*: Manually committing the position becomes harder as it
>requires that all threads co-ordinate to ensure that processing is complete
>for that partition.
>
> There are many possible variations on this approach. For example each
> processor thread can have its own queue, and the consumer threads can hash
> into these queues using the TopicPartition to ensure in-order consumption
> and simplify commit.


Cheers,

Liam Clarke-Hutchinson

On Tue, Oct 27, 2020 at 8:04 PM Pushkar Deole  wrote:

> Hi,
>
> Is there any configuration in kafka consumer to specify multiple threads
> the way it is there in kafka streams?
> Essentially, can we have a consumer with multiple threads where the threads
> would divide partitions of topic among them?
>


multi-threaded consumer configuration like stream threads?

2020-10-27 Thread Pushkar Deole
Hi,

Is there any configuration in kafka consumer to specify multiple threads
the way it is there in kafka streams?
Essentially, can we have a consumer with multiple threads where the threads
would divide partitions of topic among them?