[jira] [Commented] (KAFKA-7552) StatefulProcessorNode tries to connect state store to processor before it is added

2018-10-27 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1275#comment-1275
 ] 

Bill Bejeck commented on KAFKA-7552:


Done

> StatefulProcessorNode tries to connect state store to processor before it is 
> added
> --
>
> Key: KAFKA-7552
> URL: https://issues.apache.org/jira/browse/KAFKA-7552
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Adam Bellemare
>Priority: Minor
> Fix For: 2.1.0
>
>
> StatefulProcessorNode tries to "connectProcessorAndStateStores" before 
> "addStateStore" is called on the state store. This throws an exception. 
> Current implementations of Kafka Streams do not appear to test for this, nor 
> do any of the kafka streams applications use it. Discovered while looking to 
> use the node for another ticket.
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java#L86]
>  
> Results in "org.apache.kafka.streams.errors.TopologyException: Invalid 
> topology: StateStore  is not added yet."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7552) StatefulProcessorNode tries to connect state store to processor before it is added

2018-10-27 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-7552:
---
Fix Version/s: 2.1.0

> StatefulProcessorNode tries to connect state store to processor before it is 
> added
> --
>
> Key: KAFKA-7552
> URL: https://issues.apache.org/jira/browse/KAFKA-7552
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Adam Bellemare
>Priority: Minor
> Fix For: 2.1.0
>
>
> StatefulProcessorNode tries to "connectProcessorAndStateStores" before 
> "addStateStore" is called on the state store. This throws an exception. 
> Current implementations of Kafka Streams do not appear to test for this, nor 
> do any of the kafka streams applications use it. Discovered while looking to 
> use the node for another ticket.
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java#L86]
>  
> Results in "org.apache.kafka.streams.errors.TopologyException: Invalid 
> topology: StateStore  is not added yet."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7557) optimize LogManager.truncateFullyAndStartAt()

2018-10-27 Thread huxihx (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-7557:
-

Assignee: huxihx

> optimize LogManager.truncateFullyAndStartAt()
> -
>
> Key: KAFKA-7557
> URL: https://issues.apache.org/jira/browse/KAFKA-7557
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Jun Rao
>Assignee: huxihx
>Priority: Major
>
> When a ReplicaFetcherThread calls LogManager.truncateFullyAndStartAt() for a 
> partition, we call LogManager.checkpointLogRecoveryOffsetsInDir() and then 
> Log.deleteSnapshotsAfterRecoveryPointCheckpoint() on all the logs in that 
> directory. This requires listing all the files in each log dir to figure out 
> the snapshot files. If some logs have many log segment files. This could take 
> some time. The can potentially block a replica fetcher thread, which 
> indirectly causes the request handler threads to be blocked.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7552) StatefulProcessorNode tries to connect state store to processor before it is added

2018-10-27 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1253#comment-1253
 ] 

Matthias J. Sax commented on KAFKA-7552:


[~bbejeck] When was the PR merged? Can you update "fixed version" accordingly?

> StatefulProcessorNode tries to connect state store to processor before it is 
> added
> --
>
> Key: KAFKA-7552
> URL: https://issues.apache.org/jira/browse/KAFKA-7552
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Adam Bellemare
>Priority: Minor
>
> StatefulProcessorNode tries to "connectProcessorAndStateStores" before 
> "addStateStore" is called on the state store. This throws an exception. 
> Current implementations of Kafka Streams do not appear to test for this, nor 
> do any of the kafka streams applications use it. Discovered while looking to 
> use the node for another ticket.
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java#L86]
>  
> Results in "org.apache.kafka.streams.errors.TopologyException: Invalid 
> topology: StateStore  is not added yet."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7557) optimize LogManager.truncateFullyAndStartAt()

2018-10-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1250#comment-1250
 ] 

ASF GitHub Bot commented on KAFKA-7557:
---

huxihx opened a new pull request #5848: KAFKA-7557: optimize 
LogManager.truncateFullyAndStartAt()
URL: https://github.com/apache/kafka/pull/5848
 
 
   Instead of calling `deleteSnapshotsAfterRecoveryPointCheckpoint` for 
`allLogs`, a possible optimization could be invoking it only for the logs being 
truncated.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> optimize LogManager.truncateFullyAndStartAt()
> -
>
> Key: KAFKA-7557
> URL: https://issues.apache.org/jira/browse/KAFKA-7557
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Jun Rao
>Priority: Major
>
> When a ReplicaFetcherThread calls LogManager.truncateFullyAndStartAt() for a 
> partition, we call LogManager.checkpointLogRecoveryOffsetsInDir() and then 
> Log.deleteSnapshotsAfterRecoveryPointCheckpoint() on all the logs in that 
> directory. This requires listing all the files in each log dir to figure out 
> the snapshot files. If some logs have many log segment files. This could take 
> some time. The can potentially block a replica fetcher thread, which 
> indirectly causes the request handler threads to be blocked.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7552) StatefulProcessorNode tries to connect state store to processor before it is added

2018-10-27 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1242#comment-1242
 ] 

Bill Bejeck commented on KAFKA-7552:


Adam thanks for raising the issue.

This situation is fixed with the PR 
[https://github.com/apache/kafka/pull/5740.], so I'm going to close this ticket 
as resolved.

To give you some context a StatefulProcessorNode can be created in two cases
 # KStream#transformXX, KStream#process operations can use state stores.  
However, the stores need to have been created ahead of time.  In this case, the 
StatefulProcessorNode needs to have just name passed in for the previously 
registered store.
 # When an aggregation is performed the store needs to be built and registered 
as part of building this node.

So these two should be mutually exclusive if the name of the store is provided, 
we know the usage is from

a pre-registered store and if the store builder is provided we need to register 
the store for the processor.

The code in the org.apache.kafka.streams.kstream.internals.graph was not 
available until the upcoming 2.1 release.  The classes in this package are for 
internal use in building an intermediate representation of a Kafka Streams 
application and allow for performing optimizations.

 

Let me know if you have any more questions.

 

Thanks,

Bill

> StatefulProcessorNode tries to connect state store to processor before it is 
> added
> --
>
> Key: KAFKA-7552
> URL: https://issues.apache.org/jira/browse/KAFKA-7552
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Adam Bellemare
>Priority: Minor
>
> StatefulProcessorNode tries to "connectProcessorAndStateStores" before 
> "addStateStore" is called on the state store. This throws an exception. 
> Current implementations of Kafka Streams do not appear to test for this, nor 
> do any of the kafka streams applications use it. Discovered while looking to 
> use the node for another ticket.
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java#L86]
>  
> Results in "org.apache.kafka.streams.errors.TopologyException: Invalid 
> topology: StateStore  is not added yet."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2018-10-27 Thread Hugh O'Brien (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1233#comment-1233
 ] 

Hugh O'Brien commented on KAFKA-7504:
-

I would second Yuto's suggestion that this be made optional. As well as 
presuming linux, it presumes certain filesystems. By performing a manual 
prefetch it seems likely that an FS such as ZFS with a split-cache system may 
treat this single-use data as multi-use which may bias the LRU/LFU split, 
obviating some of the benefits. I have not tested this.

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 |71
> 2 |@@@   6171
>16 |@@@  29472
>32 |@@@   3418
>  2048 | 0
> ...
>  8192 | 3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
>  - producer-A
>  - follower-B (broker replica fetch)
>  - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of the network-thread contains 3 responses in following order: 
> [FetchConsumer, Produce, FetchFollower].
> network-thread-1 takes 3 responses and processes them sequentially 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]).
>  Ideally processing of these 3 responses completes in microseconds as in it 
> just copies ready responses into client socket's buffer with non-blocking 
> manner.
>  However, Kafka uses sendfile(2) for transferring log data to client sockets. 
> The target data might be in page cache, but old data which has written a bit 
> far ago and never read since then, are likely not.
>  If the target data isn't in page cache, kernel first needs to load the 
> target page into cache. This takes more than few