[jira] [Commented] (KAFKA-7552) StatefulProcessorNode tries to connect state store to processor before it is added
[ https://issues.apache.org/jira/browse/KAFKA-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1275#comment-1275 ] Bill Bejeck commented on KAFKA-7552: Done > StatefulProcessorNode tries to connect state store to processor before it is > added > -- > > Key: KAFKA-7552 > URL: https://issues.apache.org/jira/browse/KAFKA-7552 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0, 2.1.0 >Reporter: Adam Bellemare >Priority: Minor > Fix For: 2.1.0 > > > StatefulProcessorNode tries to "connectProcessorAndStateStores" before > "addStateStore" is called on the state store. This throws an exception. > Current implementations of Kafka Streams do not appear to test for this, nor > do any of the kafka streams applications use it. Discovered while looking to > use the node for another ticket. > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java#L86] > > Results in "org.apache.kafka.streams.errors.TopologyException: Invalid > topology: StateStore is not added yet." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7552) StatefulProcessorNode tries to connect state store to processor before it is added
[ https://issues.apache.org/jira/browse/KAFKA-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-7552: --- Fix Version/s: 2.1.0 > StatefulProcessorNode tries to connect state store to processor before it is > added > -- > > Key: KAFKA-7552 > URL: https://issues.apache.org/jira/browse/KAFKA-7552 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0, 2.1.0 >Reporter: Adam Bellemare >Priority: Minor > Fix For: 2.1.0 > > > StatefulProcessorNode tries to "connectProcessorAndStateStores" before > "addStateStore" is called on the state store. This throws an exception. > Current implementations of Kafka Streams do not appear to test for this, nor > do any of the kafka streams applications use it. Discovered while looking to > use the node for another ticket. > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java#L86] > > Results in "org.apache.kafka.streams.errors.TopologyException: Invalid > topology: StateStore is not added yet." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7557) optimize LogManager.truncateFullyAndStartAt()
[ https://issues.apache.org/jira/browse/KAFKA-7557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxihx reassigned KAFKA-7557: - Assignee: huxihx > optimize LogManager.truncateFullyAndStartAt() > - > > Key: KAFKA-7557 > URL: https://issues.apache.org/jira/browse/KAFKA-7557 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.0, 2.1.0 >Reporter: Jun Rao >Assignee: huxihx >Priority: Major > > When a ReplicaFetcherThread calls LogManager.truncateFullyAndStartAt() for a > partition, we call LogManager.checkpointLogRecoveryOffsetsInDir() and then > Log.deleteSnapshotsAfterRecoveryPointCheckpoint() on all the logs in that > directory. This requires listing all the files in each log dir to figure out > the snapshot files. If some logs have many log segment files. This could take > some time. The can potentially block a replica fetcher thread, which > indirectly causes the request handler threads to be blocked. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7552) StatefulProcessorNode tries to connect state store to processor before it is added
[ https://issues.apache.org/jira/browse/KAFKA-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1253#comment-1253 ] Matthias J. Sax commented on KAFKA-7552: [~bbejeck] When was the PR merged? Can you update "fixed version" accordingly? > StatefulProcessorNode tries to connect state store to processor before it is > added > -- > > Key: KAFKA-7552 > URL: https://issues.apache.org/jira/browse/KAFKA-7552 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0, 2.1.0 >Reporter: Adam Bellemare >Priority: Minor > > StatefulProcessorNode tries to "connectProcessorAndStateStores" before > "addStateStore" is called on the state store. This throws an exception. > Current implementations of Kafka Streams do not appear to test for this, nor > do any of the kafka streams applications use it. Discovered while looking to > use the node for another ticket. > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java#L86] > > Results in "org.apache.kafka.streams.errors.TopologyException: Invalid > topology: StateStore is not added yet." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7557) optimize LogManager.truncateFullyAndStartAt()
[ https://issues.apache.org/jira/browse/KAFKA-7557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1250#comment-1250 ] ASF GitHub Bot commented on KAFKA-7557: --- huxihx opened a new pull request #5848: KAFKA-7557: optimize LogManager.truncateFullyAndStartAt() URL: https://github.com/apache/kafka/pull/5848 Instead of calling `deleteSnapshotsAfterRecoveryPointCheckpoint` for `allLogs`, a possible optimization could be invoking it only for the logs being truncated. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > optimize LogManager.truncateFullyAndStartAt() > - > > Key: KAFKA-7557 > URL: https://issues.apache.org/jira/browse/KAFKA-7557 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.0, 2.1.0 >Reporter: Jun Rao >Priority: Major > > When a ReplicaFetcherThread calls LogManager.truncateFullyAndStartAt() for a > partition, we call LogManager.checkpointLogRecoveryOffsetsInDir() and then > Log.deleteSnapshotsAfterRecoveryPointCheckpoint() on all the logs in that > directory. This requires listing all the files in each log dir to figure out > the snapshot files. If some logs have many log segment files. This could take > some time. The can potentially block a replica fetcher thread, which > indirectly causes the request handler threads to be blocked. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7552) StatefulProcessorNode tries to connect state store to processor before it is added
[ https://issues.apache.org/jira/browse/KAFKA-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1242#comment-1242 ] Bill Bejeck commented on KAFKA-7552: Adam thanks for raising the issue. This situation is fixed with the PR [https://github.com/apache/kafka/pull/5740.], so I'm going to close this ticket as resolved. To give you some context a StatefulProcessorNode can be created in two cases # KStream#transformXX, KStream#process operations can use state stores. However, the stores need to have been created ahead of time. In this case, the StatefulProcessorNode needs to have just name passed in for the previously registered store. # When an aggregation is performed the store needs to be built and registered as part of building this node. So these two should be mutually exclusive if the name of the store is provided, we know the usage is from a pre-registered store and if the store builder is provided we need to register the store for the processor. The code in the org.apache.kafka.streams.kstream.internals.graph was not available until the upcoming 2.1 release. The classes in this package are for internal use in building an intermediate representation of a Kafka Streams application and allow for performing optimizations. Let me know if you have any more questions. Thanks, Bill > StatefulProcessorNode tries to connect state store to processor before it is > added > -- > > Key: KAFKA-7552 > URL: https://issues.apache.org/jira/browse/KAFKA-7552 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0, 2.1.0 >Reporter: Adam Bellemare >Priority: Minor > > StatefulProcessorNode tries to "connectProcessorAndStateStores" before > "addStateStore" is called on the state store. This throws an exception. > Current implementations of Kafka Streams do not appear to test for this, nor > do any of the kafka streams applications use it. Discovered while looking to > use the node for another ticket. > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java#L86] > > Results in "org.apache.kafka.streams.errors.TopologyException: Invalid > topology: StateStore is not added yet." -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread
[ https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1233#comment-1233 ] Hugh O'Brien commented on KAFKA-7504: - I would second Yuto's suggestion that this be made optional. As well as presuming linux, it presumes certain filesystems. By performing a manual prefetch it seems likely that an FS such as ZFS with a split-cache system may treat this single-use data as multi-use which may bias the LRU/LFU split, obviating some of the benefits. I have not tested this. > Broker performance degradation caused by call of sendfile reading disk in > network thread > > > Key: KAFKA-7504 > URL: https://issues.apache.org/jira/browse/KAFKA-7504 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 0.10.2.1 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura >Priority: Major > Labels: latency, performance > Attachments: image-2018-10-14-14-18-38-149.png, > image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, > image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, > image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, > image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, > image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png > > > h2. Environment > OS: CentOS6 > Kernel version: 2.6.32-XX > Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from > trunk (2.2.0-SNAPSHOT) > h2. Phenomenon > Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x > more than usual. > Normally 99th %ile is lower than 20ms, but when this issue occurs it marks > 50ms to 200ms. > At the same time we could see two more things in metrics: > 1. Disk read coincidence from the volume assigned to log.dirs. > 2. Raise in network threads utilization (by > `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`) > As we didn't see increase of requests in metrics, we suspected blocking in > event loop ran by network thread as the cause of raising network thread > utilization. > Reading through Kafka broker source code, we understand that the only disk > IO performed in network thread is reading log data through calling > sendfile(2) (via FileChannel#transferTo). > To probe that the calls of sendfile(2) are blocking network thread for some > moments, I ran following SystemTap script to inspect duration of sendfile > syscalls. > {code:java} > # Systemtap script to measure syscall duration > global s > global records > probe syscall.$1 { > s[tid()] = gettimeofday_us() > } > probe syscall.$1.return { > elapsed = gettimeofday_us() - s[tid()] > delete s[tid()] > records <<< elapsed > } > probe end { > print(@hist_log(records)) > }{code} > {code:java} > $ stap -v syscall-duration.stp sendfile > # value (us) > value | count > 0 | 0 > 1 |71 > 2 |@@@ 6171 >16 |@@@ 29472 >32 |@@@ 3418 > 2048 | 0 > ... > 8192 | 3{code} > As you can see there were some cases taking more than few milliseconds, > implies that it blocks network thread for that long and applying the same > latency for all other request/response processing. > h2. Hypothesis > Gathering the above observations, I made the following hypothesis. > Let's say network-thread-1 multiplexing 3 connections. > - producer-A > - follower-B (broker replica fetch) > - consumer-C > Broker receives requests from each of those clients, [Produce, FetchFollower, > FetchConsumer]. > They are processed well by request handler threads, and now the response > queue of the network-thread contains 3 responses in following order: > [FetchConsumer, Produce, FetchFollower]. > network-thread-1 takes 3 responses and processes them sequentially > ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]). > Ideally processing of these 3 responses completes in microseconds as in it > just copies ready responses into client socket's buffer with non-blocking > manner. > However, Kafka uses sendfile(2) for transferring log data to client sockets. > The target data might be in page cache, but old data which has written a bit > far ago and never read since then, are likely not. > If the target data isn't in page cache, kernel first needs to load the > target page into cache. This takes more than few