[GitHub] [kafka] apovzner commented on pull request #9317: KAFKA-10509: Added throttle connection accept rate metric (KIP-612)
apovzner commented on pull request #9317: URL: https://github.com/apache/kafka/pull/9317#issuecomment-698128538 @dajac Thanks for the review, I addressed your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9317: KAFKA-10509: Added throttle connection accept rate metric (KIP-612)
apovzner commented on a change in pull request #9317: URL: https://github.com/apache/kafka/pull/9317#discussion_r494053882 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1292,6 +1292,12 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend counts.synchronized { val startThrottleTimeMs = time.milliseconds val throttleTimeMs = math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startThrottleTimeMs), 0) + if (throttleTimeMs > 0) { +// record throttle time due to hitting connection rate limit +// connection could be throttled longer if the limit of the number of active connections is reached as well +maxConnectionsPerListener.get(listenerName) + .foreach(_.connectionRateThrottleSensor.record(throttleTimeMs.toDouble, startThrottleTimeMs)) Review comment: That code seemed readable to me, but perhaps looking up `ListenerConnectionQuota` is better. I made a change to lookup once, please take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation
dielhennr edited a comment on pull request #9101: URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305 Hey @jsancio , I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made? EDIT: I updated the KIP’s rejected alternatives section with a design that ties config registration to along with ClientInformation. This update would allow the user to see what dynamic configs are supported for each combination of client software name and version that is requesting the configs associated with a . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation
dielhennr edited a comment on pull request #9101: URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305 Hey @jsancio , I added some work in progress to this branch including new APIs for this feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Should I just update the current kip? I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since the connectionId can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made? EDIT: I updated the KIPs rejected alternatives section with a design that ties config registration to along with ClientInformation. This update would allow the user to see what dynamic configs are supported for each permutation of client software name and version that is requesting the configs associated with a . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9317: KAFKA-10509: Added throttle connection accept rate metric (KIP-612)
apovzner commented on a change in pull request #9317: URL: https://github.com/apache/kafka/pull/9317#discussion_r494034591 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1447,13 +1454,33 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend } } +def removeSensors(): Unit = { Review comment: agreed, `close` will be consistent. About not closing ListenerConnectionQuota on ConnectionQuotas.close(): this is on broker shutdown path where KafkaServer calls metrics.close() on shutdown as well (and metrics are owned by KafkaServer), so I don't think we are leaking anything. But I think it's better to remove listener sensors on ConnectionQuotas.close() anyways, so I will make that change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9075) Extend documentation for usage of GlobalKTable vs KTable
[ https://issues.apache.org/jira/browse/KAFKA-9075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ankit Kumar reassigned KAFKA-9075: -- Assignee: Ankit Kumar > Extend documentation for usage of GlobalKTable vs KTable > > > Key: KAFKA-9075 > URL: https://issues.apache.org/jira/browse/KAFKA-9075 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Reporter: Boyang Chen >Assignee: Ankit Kumar >Priority: Minor > Labels: newbie, newbie++ > > We have a KIP which implements global KTable and explains its design > reasoning and comparison with general KTable. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams] > The part missing is on the official documentation to port this information, > and let user make this choice easier. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…
chia7712 commented on pull request #9284: URL: https://github.com/apache/kafka/pull/9284#issuecomment-698083970 ``` Build / JDK 11 / kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup ``` it is flaky on trunk branch also so it is unrelated to this patch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10297) Don't use deprecated producer config `retries`
[ https://issues.apache.org/jira/browse/KAFKA-10297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-10297. - Resolution: Invalid We change the KIP to not deprecate `retries`. Hence, this ticket is invalid now. Cf [https://github.com/apache/kafka/pull/9333] > Don't use deprecated producer config `retries` > -- > > Key: KAFKA-10297 > URL: https://issues.apache.org/jira/browse/KAFKA-10297 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.7.0 >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 2.7.0 > > > In 2.7.0 release, producer config `retries` gets deprecated via KIP-572. > Connect is still using this config what needs to be fixed (cf > [https://github.com/apache/kafka/pull/8864/files#r439685920]) > {quote}Btw: @hachikuji raise a concern about this issue, too: > https://github.com/apache/kafka/pull/8864#pullrequestreview-443424531 > > I just had one question about the proposal. Using retries=0 in the producer > > allows the user to have "at-most-once" delivery. This allows the > > application to implement its own retry logic for example. Do we still have > > a way to do this once this configuration is gone? > So maybe we need to do some follow up work in the `Producer` to make it work > for Connect. But I would defer this to the follow up work. > My original though was, that setting `max.deliver.timeout.ms := request > .timeout.ms` might prevent internal retries. But we need to verify this. It > was also brought to my attention, that this might not work if the network > disconnects -- only `retries=0` would prevent to re-open the connection but a > low timeout would not prevent retries. > In KIP-572, we proposed for Kafka Streams itself, to treat `task.timeout.ms = > 0` as "no retries" -- maybe we can do a similar thing for the producer? > There is also `max.block.ms` that we should consider. Unfortunately, I am not > an expert on the `Producer`. But I would like to move forward with KIP-572 > for now and are happy to help to resolve those questions. > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config
mjsax commented on a change in pull request #8864: URL: https://github.com/apache/kafka/pull/8864#discussion_r493976743 ## File path: streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java ## @@ -45,6 +45,7 @@ private static final String SINK_TOPIC = "streamsResilienceSink"; +@SuppressWarnings("deprecation") // TODO revisit in follow up PR Review comment: Reverted via #9333 ## File path: streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java ## @@ -122,6 +123,7 @@ public void apply(final String key, final String value) { }); } +@SuppressWarnings("deprecation") // TODO revisit in follow up PR Review comment: Reverted via #9333 ## File path: streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java ## @@ -49,7 +49,7 @@ public class StreamsOptimizedTest { - +@SuppressWarnings("deprecation") // TODO revisit in follow up PR Review comment: Reverted via #9333 ## File path: streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java ## @@ -43,6 +43,7 @@ public class StreamsStandByReplicaTest { +@SuppressWarnings("deprecation") // TODO revisit in follow up PR Review comment: Reverted via #9333 ## File path: streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java ## @@ -158,6 +159,7 @@ private static void shutdown(final KafkaStreams streams) { streams.close(Duration.ofSeconds(10)); } +@SuppressWarnings("deprecation") // TODO revisit in follow up PR Review comment: Reverted via #9333 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #9333: KAFKA-9274: Revert deprecation of `retries` for producer and admin clients
mjsax opened a new pull request #9333: URL: https://github.com/apache/kafka/pull/9333 Partially reverts #8864 Call for review @vvcephei @hachikuji @cmccabe @ijuma This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9331: MINOR: Use JUnit 5 in raft module
ijuma commented on a change in pull request #9331: URL: https://github.com/apache/kafka/pull/9331#discussion_r493969248 ## File path: raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java ## @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.raft; - -import org.junit.Test; - -public class VotedStateTest { Review comment: Thanks, you saved me from having to do that. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10519) Unit tests for VotedState
Jason Gustafson created KAFKA-10519: --- Summary: Unit tests for VotedState Key: KAFKA-10519 URL: https://issues.apache.org/jira/browse/KAFKA-10519 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson We accidentally checked in an empty test class `VotedStateTest`. We should add missing unit tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9331: MINOR: Use JUnit 5 in raft module
hachikuji commented on a change in pull request #9331: URL: https://github.com/apache/kafka/pull/9331#discussion_r493968673 ## File path: raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java ## @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.raft; - -import org.junit.Test; - -public class VotedStateTest { Review comment: Ouch. Probably meant to come back to this. Filed https://issues.apache.org/jira/browse/KAFKA-10519. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #9332: KAFKA-10511; Ensure monotonic start epoch/offset updates in `MockLog`
hachikuji opened a new pull request #9332: URL: https://github.com/apache/kafka/pull/9332 There is a minor difference in behavior between the epoch caching logic in `MockLog` from the behavior in `LeaderEpochFileCache`. The latter ensures that every new epoch/start offset entry added to the cache increases monotonically over the previous entries. This patch brings the behavior of `MockLog` in line. It also simplifies the `assignEpochStartOffset` api in `ReplicatedLog`. We always intend to use the log end offset, so this patch removes the start offset parameter. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #9331: MINOR: Use JUnit 5 in raft module
ijuma opened a new pull request #9331: URL: https://github.com/apache/kafka/pull/9331 I also removed a test class with no tests currently. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10518) Consumer fetches could be inefficient when lags are unbalanced
Dhruvil Shah created KAFKA-10518: Summary: Consumer fetches could be inefficient when lags are unbalanced Key: KAFKA-10518 URL: https://issues.apache.org/jira/browse/KAFKA-10518 Project: Kafka Issue Type: Bug Reporter: Dhruvil Shah Consumer fetches are inefficient when lags are imbalanced across partitions, due to head of the line blocking and the behavior of blocking for `max.wait.ms` until data is available. When the consumer receives a fetch response, it prepares the next fetch request and sends it out. The caveat is that the subsequent fetch request would explicitly exclude partitions for which the consumer received data in the previous round. This is to allow the consumer application to drain the data for those partitions, until the consumer fetches the other partitions it is subscribed to. This behavior does not play out too well if the consumer is consuming when the lag is unbalanced, because it would receive data for the partitions it is lagging on, and then it would send a fetch request for partitions that do not have any data (or have little data). The latter will end up blocking for fetch.max.wait.ms on the broker before an empty response is sent back. This slows down the consumer’s overall consumption throughput since fetch.max.wait.ms is 500ms by default. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10517) Inefficient consumer processing with fetch sessions
[ https://issues.apache.org/jira/browse/KAFKA-10517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruvil Shah updated KAFKA-10517: - Description: With the introduction of fetch sessions, the consumer and the broker share a unified view of the partitions being consumed and their current state (fetch_offset, last_propagated_hwm, last_propagated_start_offset, etc.). The consumer is still expected to consume in a round robin manner, however, we have observed certain cases where this is not the case. Because of how we perform memory management on the consumer and implement fetch pipelining, we exclude partitions from a `FetchRequest` when they have not been drained by the application. This is done by adding these partitions to the `toForget` list in the `FetchRequest`. When partitions are added to the `toForget` list, the broker removes these partitions from its session cache. This causes bit of a divergence between the broker's and the client's view of the metadata. When forgotten partitions are added back to the Fetch after the application have drained them, the server will immediately add them back to the session cache and return a response for them, even if there is no corresponding data. This re-triggers the behavior on the consumer to put this partition on the `toForget` list incorrectly, even though no data for the partition may have been returned. We have seen this behavior to cause an imbalance in lags across partitions as the consumer no longer obeys the round-robin sequence given that the partitions keep shuffling between the `toForget` and `toSend` lists. At a high level, this is caused due to the out of sync session caches on the consumer and broker. This ends up in a state where the partition balance is being maintained by external factors (such as whether metadata was returned for a partition), rather than following the round-robin ordering. was: With the introduction of fetch sessions, the consumer and the broker share a unified view of the partitions being consumed and their current state (fetch_offset, last_propagated_hwm, last_propagated_start_offset, etc.). The consumer is still expected to consume in a round robin manner, however, we have observed certain cases where this is not the case. Because of how we perform memory management on the consumer and implement fetch pipelining, we exclude partitions from a FetchRequest when they have not been drained by the application. This is done by adding these partitions to the `toForget` list in the `FetchRequest`. When partitions are added to the `toForget` list, the broker removes these partitions from its session cache. This causes bit of a divergence between the broker's and the client's view of the metadata. When forgotten partitions are added back to the Fetch after the application have drained them, the server will immediately add them back to the session cache and return a response for them, even if there is no corresponding data. This re-triggers the behavior on the consumer to put this partition on the `toForget` list incorrectly, even though no data for the partition may have been returned. We have seen this behavior to cause an imbalance in lags across partitions as the consumer no longer obeys the round-robin sequence given that the partitions keep shuffling between the `toForget` and `toSend` lists. At a high level, this is caused due to the out of sync session caches on the consumer and broker. This ends up in a state where the partition balance is being maintained by external factors (such as whether metadata was returned for a partition), rather than following the round-robin ordering. > Inefficient consumer processing with fetch sessions > --- > > Key: KAFKA-10517 > URL: https://issues.apache.org/jira/browse/KAFKA-10517 > Project: Kafka > Issue Type: Bug >Reporter: Dhruvil Shah >Priority: Major > > With the introduction of fetch sessions, the consumer and the broker share a > unified view of the partitions being consumed and their current state > (fetch_offset, last_propagated_hwm, last_propagated_start_offset, etc.). The > consumer is still expected to consume in a round robin manner, however, we > have observed certain cases where this is not the case. > Because of how we perform memory management on the consumer and implement > fetch pipelining, we exclude partitions from a `FetchRequest` when they have > not been drained by the application. This is done by adding these partitions > to the `toForget` list in the `FetchRequest`. When partitions are added to > the `toForget` list, the broker removes these partitions from its session > cache. This causes bit of a divergence between the broker's and the client's > view of the metadata. > When forgotten partitions are added back
[jira] [Created] (KAFKA-10517) Inefficient consumer processing with fetch sessions
Dhruvil Shah created KAFKA-10517: Summary: Inefficient consumer processing with fetch sessions Key: KAFKA-10517 URL: https://issues.apache.org/jira/browse/KAFKA-10517 Project: Kafka Issue Type: Bug Reporter: Dhruvil Shah With the introduction of fetch sessions, the consumer and the broker share a unified view of the partitions being consumed and their current state (fetch_offset, last_propagated_hwm, last_propagated_start_offset, etc.). The consumer is still expected to consume in a round robin manner, however, we have observed certain cases where this is not the case. Because of how we perform memory management on the consumer and implement fetch pipelining, we exclude partitions from a FetchRequest when they have not been drained by the application. This is done by adding these partitions to the `toForget` list in the `FetchRequest`. When partitions are added to the `toForget` list, the broker removes these partitions from its session cache. This causes bit of a divergence between the broker's and the client's view of the metadata. When forgotten partitions are added back to the Fetch after the application have drained them, the server will immediately add them back to the session cache and return a response for them, even if there is no corresponding data. This re-triggers the behavior on the consumer to put this partition on the `toForget` list incorrectly, even though no data for the partition may have been returned. We have seen this behavior to cause an imbalance in lags across partitions as the consumer no longer obeys the round-robin sequence given that the partitions keep shuffling between the `toForget` and `toSend` lists. At a high level, this is caused due to the out of sync session caches on the consumer and broker. This ends up in a state where the partition balance is being maintained by external factors (such as whether metadata was returned for a partition), rather than following the round-robin ordering. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wushujames commented on pull request #9276: KAFKA-10473: Add docs on partition size-on-disk, and other log-related metrics
wushujames commented on pull request #9276: URL: https://github.com/apache/kafka/pull/9276#issuecomment-697994325 The pull request lists lots of failed checks. However, this pull request only changed an HTML file, and it seems unrelated to the test failures in those checks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #9196: [DO NOT MERGE] KAFKA-10402: Upgrade system tests to python3
nizhikov commented on pull request #9196: URL: https://github.com/apache/kafka/pull/9196#issuecomment-697987563 @guozhangwang > Could you rebase this PR done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #9312: KAFKA-10505: Fix parsing of generation log string.
nizhikov commented on pull request #9312: URL: https://github.com/apache/kafka/pull/9312#issuecomment-697986003 Thanks for the review and merge! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed
guozhangwang merged pull request #9083: URL: https://github.com/apache/kafka/pull/9083 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9196: [DO NOT MERGE] KAFKA-10402: Upgrade system tests to python3
guozhangwang commented on pull request #9196: URL: https://github.com/apache/kafka/pull/9196#issuecomment-697981040 https://github.com/apache/kafka/pull/9312 has been merged it. Could you rebase this PR so I can re-trigger the system test suite? As for the ignored test cases, they are known issues and we can keep them "ignored" for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #9312: KAFKA-10505: Fix parsing of generation log string.
guozhangwang merged pull request #9312: URL: https://github.com/apache/kafka/pull/9312 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9312: KAFKA-10505: Fix parsing of generation log string.
guozhangwang commented on pull request #9312: URL: https://github.com/apache/kafka/pull/9312#issuecomment-697980115 https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4182/ passed now. I'm going to merge this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] scanterog commented on pull request #9313: [mm2] Fix consumer/producer properties override
scanterog commented on pull request #9313: URL: https://github.com/apache/kafka/pull/9313#issuecomment-697968761 @hachikuji is there any chance you can review this? Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on pull request #9100: URL: https://github.com/apache/kafka/pull/9100#issuecomment-697953307 @hachikuji yea, good catch. This works today using a ZK watch on the partition "/state" znode which is still getting triggered with this PR. We can modify the new ISR update path to explicitly call `onPartitionReassignment` after writing out the ISR. How about we save this as a follow-on? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9317: KAFKA-10509: Added throttle connection accept rate metric (KIP-612)
apovzner commented on a change in pull request #9317: URL: https://github.com/apache/kafka/pull/9317#discussion_r493869256 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1414,7 +1420,8 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends ListenerReconfigurable { @volatile private var _maxConnections = Int.MaxValue -val connectionRateSensor = createConnectionRateQuotaSensor(Int.MaxValue, Some(listener.value)) +val connectionRateSensor: Sensor = createConnectionRateQuotaSensor(Int.MaxValue, Some(listener.value)) +val connectionRateThrottleSensor: Sensor = createConnectionRateThrottleSensor() Review comment: I added type because style check suggested (and also checked style guide) to annotate public members. However, I will change this to package scope, since we only need an outer class to access them. So, it was useful that you asked. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9317: KAFKA-10509: Added throttle connection accept rate metric (KIP-612)
apovzner commented on a change in pull request #9317: URL: https://github.com/apache/kafka/pull/9317#discussion_r493869256 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1414,7 +1420,8 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends ListenerReconfigurable { @volatile private var _maxConnections = Int.MaxValue -val connectionRateSensor = createConnectionRateQuotaSensor(Int.MaxValue, Some(listener.value)) +val connectionRateSensor: Sensor = createConnectionRateQuotaSensor(Int.MaxValue, Some(listener.value)) +val connectionRateThrottleSensor: Sensor = createConnectionRateThrottleSensor() Review comment: I added type because style check suggested (and also checked style guide) to annotate public members. However, I will change this to package scope, since we only need an outer class to access them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493823776 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel, val adminZkClient = new AdminZkClient(zkClient) private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId) + /** + * The template to create a forward request handler. + * + * @tparam T request type + * @tparam R response type + * @tparam RK resource key + * @tparam RV resource value + */ + private[server] abstract class ForwardRequestHandler[T <: AbstractRequest, +R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging { + +/** + * Split the given resource into authorized and unauthorized sets. + * + * @return authorized resources and unauthorized resources + */ +def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError]) + +/** + * Controller handling logic of the request. + */ +def process(authorizedResources: Map[RK, RV], +unauthorizedResult: Map[RK, ApiError], +request: T): Unit + +/** + * Build a forward request to the controller. + * + * @param authorizedResources authorized resources by the forwarding broker + * @param request the original request + * @return forward request builder + */ +def createRequestBuilder(authorizedResources: Map[RK, RV], + request: T): AbstractRequest.Builder[T] + +/** + * Merge the forward response with the previously unauthorized results. + * + * @param forwardResponse the forward request's response + * @param unauthorizedResult original unauthorized results + * @return combined response to the original client + */ +def mergeResponse(forwardResponse: R, + unauthorizedResult: Map[RK, ApiError]): R + +def handle(): Unit = { + val requestBody = request.body[AbstractRequest].asInstanceOf[T] + val (authorizedResources, unauthorizedResources) = resourceSplitByAuthorization(requestBody) + if (isForwardingRequest(request)) { +if (!controller.isActive) { + sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception()) + } else { +// For forwarding requests, the authentication failure is not caused by +// the original client, but by the broker. +val unauthorizedResult = unauthorizedResources.keys.map { + resource => resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null) +}.toMap + +process(authorizedResources, unauthorizedResult, requestBody) + } + } else if (!controller.isActive && config.redirectionEnabled && +authorizedResources.nonEmpty) { +redirectionManager.forwardRequest( + createRequestBuilder(authorizedResources, requestBody), Review comment: As discussed offline, we can pass the expected version down to the Builder. The abstract builder already supports an explicit range of versions. In any case, it doesn't seem like we have a choice. By the way, one potential edge case here is that the broker receiving the request has upgraded to a later version than the controller. This would be possible in the middle of a rolling upgrade. I don't think there's an easy way to handle this. We could return UNSUPPORTED_VERSION to the client, but that would be surprising since the client chose a supported API based on ApiVersions and is not aware of the controller redirection. One idea to address this problem is to gate version upgrades to redirectable APIs by the IBP. Basically all of these APIs have become inter-broker APIs through redirection so they need the safeguard of the IBP. Feels like we might have to do this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on pull request #8295: URL: https://github.com/apache/kafka/pull/8295#issuecomment-697885454 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on pull request #8295: URL: https://github.com/apache/kafka/pull/8295#issuecomment-697868544 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10477) Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record
[ https://issues.apache.org/jira/browse/KAFKA-10477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17201019#comment-17201019 ] Shaik Zakir Hussain commented on KAFKA-10477: - All versions up and above *v2.3.1* are affected by this issue. > Sink Connector fails with DataException when trying to convert Kafka record > with empty key to Connect Record > > > Key: KAFKA-10477 > URL: https://issues.apache.org/jira/browse/KAFKA-10477 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.3.1 >Reporter: Shaik Zakir Hussain >Assignee: Shaik Zakir Hussain >Priority: Major > > Sink connector is facing a DataException when trying to convert a kafka > record with empty key to Connect data format. > Kafka's trunk branch currently depends on *jackson v2.10.5* > A short unit test (shared below) in > `org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue. > > {code:java} > @Test > public void testToConnectDataEmptyKey() throws IOException { > Map props = > Collections.singletonMap("schemas.enable", false); > converter.configure(props, true); > String str = ""; > SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", > str.getBytes()); > System.out.println(schemaAndValue); > } > {code} > This test code snippet fails with the following exception: > {noformat} > org.apache.kafka.connect.errors.DataException: Unknown schema type: null > at > org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764) > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385) > at > org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > {noformat} > > This seems related to the issue > [https://github.com/FasterXML/jackson-databind/issues/2211] , where jackson > lib started returning `MissingNode` for empty input in > `ObjectMapper.readTree(input)` method invocation. Precise code change can be > observed here: > [https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094] > > > This causes an exception to throw up in our JsonConverter class : > [https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764] > > > In my opinion, when the `jsonValue.getNodeType()`
[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493795436 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel, val adminZkClient = new AdminZkClient(zkClient) private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId) + /** + * The template to create a forward request handler. + * + * @tparam T request type + * @tparam R response type + * @tparam RK resource key + * @tparam RV resource value + */ + private[server] abstract class ForwardRequestHandler[T <: AbstractRequest, +R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging { + +/** + * Split the given resource into authorized and unauthorized sets. + * + * @return authorized resources and unauthorized resources + */ +def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError]) + +/** + * Controller handling logic of the request. + */ +def process(authorizedResources: Map[RK, RV], +unauthorizedResult: Map[RK, ApiError], +request: T): Unit + +/** + * Build a forward request to the controller. + * + * @param authorizedResources authorized resources by the forwarding broker + * @param request the original request + * @return forward request builder + */ +def createRequestBuilder(authorizedResources: Map[RK, RV], + request: T): AbstractRequest.Builder[T] + +/** + * Merge the forward response with the previously unauthorized results. + * + * @param forwardResponse the forward request's response + * @param unauthorizedResult original unauthorized results + * @return combined response to the original client + */ +def mergeResponse(forwardResponse: R, + unauthorizedResult: Map[RK, ApiError]): R + +def handle(): Unit = { + val requestBody = request.body[AbstractRequest].asInstanceOf[T] + val (authorizedResources, unauthorizedResources) = resourceSplitByAuthorization(requestBody) + if (isForwardingRequest(request)) { +if (!controller.isActive) { + sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception()) + } else { +// For forwarding requests, the authentication failure is not caused by +// the original client, but by the broker. +val unauthorizedResult = unauthorizedResources.keys.map { + resource => resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null) +}.toMap + +process(authorizedResources, unauthorizedResult, requestBody) + } + } else if (!controller.isActive && config.redirectionEnabled && +authorizedResources.nonEmpty) { +redirectionManager.forwardRequest( + createRequestBuilder(authorizedResources, requestBody), Review comment: It's a bit hard since we are passing requestBuilder all the way to NetworkClient, so if we want a designated version to build the request, that may involve some non-trivial changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…
hachikuji commented on pull request #9284: URL: https://github.com/apache/kafka/pull/9284#issuecomment-697805395 @chia7712 Mentioning the fix in the upgrade notes seems the best we can do. The old (unintended) behavior does not seem worth keeping for the sake of compatibility. @rajinisivaram What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493782410 ## File path: clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java ## @@ -25,23 +25,35 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; public class IncrementalAlterConfigsResponse extends AbstractResponse { -public static IncrementalAlterConfigsResponseData toResponseData(final int requestThrottleMs, - final Map results) { -IncrementalAlterConfigsResponseData responseData = new IncrementalAlterConfigsResponseData(); -responseData.setThrottleTimeMs(requestThrottleMs); -for (Map.Entry entry : results.entrySet()) { -responseData.responses().add(new AlterConfigsResourceResponse(). -setResourceName(entry.getKey().name()). -setResourceType(entry.getKey().type().id()). -setErrorCode(entry.getValue().error().code()). -setErrorMessage(entry.getValue().message())); -} -return responseData; +public IncrementalAlterConfigsResponse(final int requestThrottleMs, + final Map results) { +this.data = new IncrementalAlterConfigsResponseData() +.setThrottleTimeMs(requestThrottleMs); + +addResults(results); +} + +public IncrementalAlterConfigsResponse addResults(final Map results) { Review comment: I guess we could get rid of it and do the merge in caller level. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493773324 ## File path: core/src/main/scala/kafka/server/ConfigHandler.scala ## @@ -203,7 +203,13 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig, if (brokerId == ConfigEntityName.Default) brokerConfig.dynamicConfig.updateDefaultConfig(properties) else if (brokerConfig.brokerId == brokerId.trim.toInt) { - brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties) + val persistentProps = brokerConfig.dynamicConfig.fromPersistentProps(properties, perBrokerConfig = true) + // The filepath was changed for equivalent replacement, which means we should reload + if (brokerConfig.dynamicConfig.trimSSLStorePaths(persistentProps)) { + brokerConfig.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(persistentProps) + } Review comment: I feel it's more explicit to do it in here, as zk notification is the only target case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493771267 ## File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala ## @@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } } + private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={ +val processedFiles = new mutable.HashSet[String] +reconfigurables + .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains)) +.foreach({ + case reconfigurable: ListenerReconfigurable => +ReloadableFileConfigs.foreach(configName => { + val prefixedName = reconfigurable.listenerName.configPrefix + configName + if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName) && + configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName, ""))) { +val equivalentFileName = configProps.getProperty(prefixedName).replace("/", "//") +configProps.setProperty(prefixedName, equivalentFileName) +processedFiles.add(prefixedName) + } +}) +}) + } + + private[server] def trimSSLStorePaths(configProps: Properties): Boolean = { +var fileChanged = false +val processedFiles = new mutable.HashSet[String] + +reconfigurables + .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains)) + .foreach { +case reconfigurable: ListenerReconfigurable => +ReloadableFileConfigs.foreach(configName => { + val prefixedName = reconfigurable.listenerName.configPrefix + configName + if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName)) { +val configFileName = configProps.getProperty(prefixedName) +val equivalentFileName = configFileName.replace("//", "/") +if (!configFileName.equals(equivalentFileName)) { + fileChanged = true Review comment: Yea This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493771144 ## File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala ## @@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } } + private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={ +val processedFiles = new mutable.HashSet[String] +reconfigurables + .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains)) +.foreach({ + case reconfigurable: ListenerReconfigurable => +ReloadableFileConfigs.foreach(configName => { + val prefixedName = reconfigurable.listenerName.configPrefix + configName + if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName) && + configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName, ""))) { +val equivalentFileName = configProps.getProperty(prefixedName).replace("/", "//") Review comment: Yes, we would trim it in `trimSslStorePaths` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] piotrrzysko commented on pull request #9315: KAFKA-10496: Removed relying on external DNS servers in tests
piotrrzysko commented on pull request #9315: URL: https://github.com/apache/kafka/pull/9315#issuecomment-697717628 @jolshan @mumrah Thanks for the review! Of course, I agree with you that it would be nice to use the mock in all tests, but after rethinking the solution suggested in this PR I came to the conclusion that it is not so easy. I assumed that `InetAddress#getAllByName` is the only place where DNS resolution happens - it’s not true. The constructor of `InetSocketAddress` that is used by `ClientUtils#parseAndValidateAddresses` in some circumstances (when a passed hostname is a machine name rather than a literal IP address) may also call an external DNS server. This behavior is much harder to mock. Perhaps it is not a problem for tests from `ClientUtilsTest`, because as I checked only `localhost` is resolved in this way. However, I wouldn’t like to introduce abstraction over DNS resolving mechanism that is unable to cover all cases. Unfortunately, the only solution that comes to my mind is to try a hack based on reflection that will allow replacing the default DNS provider in tests. It was something that I wanted to avoid because it can cause problems with future versions of Java. Nevertheless, I can check if it is possible and how ugly it will be :). WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10477) Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record
[ https://issues.apache.org/jira/browse/KAFKA-10477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaik Zakir Hussain updated KAFKA-10477: Affects Version/s: 2.3.1 > Sink Connector fails with DataException when trying to convert Kafka record > with empty key to Connect Record > > > Key: KAFKA-10477 > URL: https://issues.apache.org/jira/browse/KAFKA-10477 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.3.1 >Reporter: Shaik Zakir Hussain >Assignee: Shaik Zakir Hussain >Priority: Major > > Sink connector is facing a DataException when trying to convert a kafka > record with empty key to Connect data format. > Kafka's trunk branch currently depends on *jackson v2.10.5* > A short unit test (shared below) in > `org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue. > > {code:java} > @Test > public void testToConnectDataEmptyKey() throws IOException { > Map props = > Collections.singletonMap("schemas.enable", false); > converter.configure(props, true); > String str = ""; > SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", > str.getBytes()); > System.out.println(schemaAndValue); > } > {code} > This test code snippet fails with the following exception: > {noformat} > org.apache.kafka.connect.errors.DataException: Unknown schema type: null > at > org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764) > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385) > at > org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > {noformat} > > This seems related to the issue > [https://github.com/FasterXML/jackson-databind/issues/2211] , where jackson > lib started returning `MissingNode` for empty input in > `ObjectMapper.readTree(input)` method invocation. Precise code change can be > observed here: > [https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094] > > > This causes an exception to throw up in our JsonConverter class : > [https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764] > > > In my opinion, when the `jsonValue.getNodeType()` is `MISSING` >
[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493752855 ## File path: core/src/main/scala/kafka/server/AdminManager.scala ## @@ -547,7 +553,8 @@ class AdminManager(val config: KafkaConfig, None else { val id = resourceNameToBrokerId(resource.name) - if (id != this.config.brokerId) + // Under redirection, it is possible to handle config changes targeting at brokers other than the controller. Review comment: The logic is needed when there is an AlterConfigRequest targeting at a specific broker. Since the non-controller node will no longer handle AlterConfigs, it is possible to see a redirected changing request with a broker.id different than the controller broker.id. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493751026 ## File path: core/src/main/scala/kafka/server/AdminManager.scala ## @@ -513,15 +513,21 @@ class AdminManager(val config: KafkaConfig, resource -> ApiError.NONE } - private def alterBrokerConfigs(resource: ConfigResource, validateOnly: Boolean, - configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = { + private def alterBrokerConfigs(resource: ConfigResource, + validateOnly: Boolean, + configProps: Properties, + configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = { val brokerId = getBrokerId(resource) val perBrokerConfig = brokerId.nonEmpty this.config.dynamicConfig.validate(configProps, perBrokerConfig) validateConfigPolicy(resource, configEntriesMap) if (!validateOnly) { - if (perBrokerConfig) + if (perBrokerConfig) { +val previousConfigProps = config.dynamicConfig.currentDynamicBrokerConfigs this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps) +this.config.dynamicConfig.maybeAugmentSSLStorePaths(configProps, previousConfigProps) Review comment: The rational is to trigger a reload of ssl store file by the ZK notification. @cmccabe @rajinisivaram came out this idea to augment the path to ``` //path//to//ssl//store//file ``` when a reload is requested on the receiver broker, and by propagating such a path other brokers would see a difference and thus reload their corresponding store files as well. In the meantime, we need to trim the path back to single slash after handling the notification: ``` /path/to/ssl/store/file ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493742677 ## File path: core/src/main/scala/kafka/api/ApiVersion.scala ## @@ -103,6 +103,9 @@ object ApiVersion { KAFKA_2_7_IV0, // Bup Fetch protocol for Raft protocol (KIP-595) KAFKA_2_7_IV1, +// Enable redirection (KIP-590) +// TODO: remove this IBP in the 2.7 release if redirection work could not be done before the freeze Review comment: I'm not sure I follow. Do you not want redirection to be part of 2.7? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9330: MINOR: Remove unneeded FIXME
hachikuji merged pull request #9330: URL: https://github.com/apache/kafka/pull/9330 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #9330: MINOR: Remove unneeded FIXME
hachikuji opened a new pull request #9330: URL: https://github.com/apache/kafka/pull/9330 A previous iteration of the Raft patch had a broken check for disconnects. We had fixed the problem, but forgotten to remove the FIXME. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error
zhaohaidao commented on a change in pull request #9311: URL: https://github.com/apache/kafka/pull/9311#discussion_r493708455 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ## @@ -381,24 +385,35 @@ class TransactionCoordinator(brokerId: Int, if (txnMetadata.producerId != producerId) Left(Errors.INVALID_PRODUCER_ID_MAPPING) // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch. -else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch) +else if (isFromClient && producerEpoch != txnMetadata.producerEpoch) { + if (producerEpoch == txnMetadata.lastProducerEpoch) { +Left(Errors.TRANSACTION_TIMED_OUT) + } else { +Left(Errors.PRODUCER_FENCED) + } +} else if (producerEpoch < txnMetadata.producerEpoch) { Left(Errors.PRODUCER_FENCED) -else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence) +} else if (txnMetadata.pendingTransitionInProgress + && !txnMetadata.pendingState.contains(PrepareEpochFence) + && !txnMetadata.pendingState.contains(PrepareEpochBumpThenAbort)) Left(Errors.CONCURRENT_TRANSACTIONS) else txnMetadata.state match { case Ongoing => val nextState = if (txnMarkerResult == TransactionResult.COMMIT) PrepareCommit -else +else { PrepareAbort - -if (nextState == PrepareAbort && txnMetadata.pendingState.contains(PrepareEpochFence)) { +} +if (nextState == PrepareAbort && (txnMetadata.pendingState.get == PrepareEpochFence + || txnMetadata.pendingState.get == PrepareEpochBumpThenAbort)) { // We should clear the pending state to make way for the transition to PrepareAbort and also bump // the epoch in the transaction metadata we are about to append. - isEpochFence = true + isEpochFence = txnMetadata.pendingState.get == PrepareEpochFence Review comment: According to my understanding, the semantics of PrepareEpochFence and PrepareEpochBumpThenAbort are different. If a new state is not introduced, what should the parameter `newState` fill in when transitionTo is called? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error
zhaohaidao commented on a change in pull request #9311: URL: https://github.com/apache/kafka/pull/9311#discussion_r493695444 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ## @@ -1072,7 +1076,11 @@ private void transitionTo(State target, RuntimeException error) { if (error == null) throw new IllegalArgumentException("Cannot transition to " + target + " with a null exception"); lastError = error; +abortableError = error; } else { +if (target != State.ABORTING_TRANSACTION) { Review comment: yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request #9329: Backport Jenkinsfile to 2.4 branch
mumrah opened a new pull request #9329: URL: https://github.com/apache/kafka/pull/9329 This is a backport of the Jenkinsfile for 2.4 so we can use the PR builder job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi edited a comment on pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand
viktorsomogyi edited a comment on pull request #4090: URL: https://github.com/apache/kafka/pull/4090#issuecomment-697538096 Rebased the PR and addressed your comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand
viktorsomogyi commented on pull request #4090: URL: https://github.com/apache/kafka/pull/4090#issuecomment-697538096 Rebased the PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request #9328: Minor: Add deleteDir for streams quickstart test
mumrah opened a new pull request #9328: URL: https://github.com/apache/kafka/pull/9328 Add a `deleteDir` directive to the temporary dir we create during the streams/quickstart archetype test in the Jenkinsfile This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request #9327: Backport Jenkinsfile to 2.5
mumrah opened a new pull request #9327: URL: https://github.com/apache/kafka/pull/9327 Add Jenkinsfile to 2.5 branch so we can use the PR builder job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8318) Session Window Aggregations generate an extra tombstone
[ https://issues.apache.org/jira/browse/KAFKA-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17200880#comment-17200880 ] Ilia Pasynkov commented on KAFKA-8318: -- Hello, can I pick this issue? > Session Window Aggregations generate an extra tombstone > --- > > Key: KAFKA-8318 > URL: https://issues.apache.org/jira/browse/KAFKA-8318 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Minor > Labels: newbie++ > > See the discussion > https://github.com/apache/kafka/pull/6654#discussion_r280231439 > The session merging logic generates a tombstone in addition to an update when > the session window already exists. It's not a correctness issue, just a small > performance hit, because that tombstone is immediately invalidated by the > update. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …
chia7712 commented on pull request #9206: URL: https://github.com/apache/kafka/pull/9206#issuecomment-697508626 rebase to include the fixes of flaky. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 commented on pull request #9318: URL: https://github.com/apache/kafka/pull/9318#issuecomment-697504023 rebase to fix conflicting This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9323: KAFKA-10514: Fix unit test for state directory cleanup
vvcephei merged pull request #9323: URL: https://github.com/apache/kafka/pull/9323 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9323: KAFKA-10514: Fix unit test for state directory cleanup
vvcephei commented on pull request #9323: URL: https://github.com/apache/kafka/pull/9323#issuecomment-697497868 Test failure was unrelated: ``` Build / JDK 11 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
rajinisivaram commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493520165 ## File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala ## @@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } } + private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={ +val processedFiles = new mutable.HashSet[String] +reconfigurables + .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains)) +.foreach({ + case reconfigurable: ListenerReconfigurable => +ReloadableFileConfigs.foreach(configName => { + val prefixedName = reconfigurable.listenerName.configPrefix + configName + if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName) && + configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName, ""))) { +val equivalentFileName = configProps.getProperty(prefixedName).replace("/", "//") +configProps.setProperty(prefixedName, equivalentFileName) +processedFiles.add(prefixedName) + } +}) +}) + } + + private[server] def trimSSLStorePaths(configProps: Properties): Boolean = { +var fileChanged = false +val processedFiles = new mutable.HashSet[String] + +reconfigurables + .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains)) + .foreach { +case reconfigurable: ListenerReconfigurable => +ReloadableFileConfigs.foreach(configName => { + val prefixedName = reconfigurable.listenerName.configPrefix + configName + if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName)) { +val configFileName = configProps.getProperty(prefixedName) +val equivalentFileName = configFileName.replace("//", "/") +if (!configFileName.equals(equivalentFileName)) { + fileChanged = true Review comment: This means update was requested, but not necessarily that file has changed? ## File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala ## @@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } } + private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={ Review comment: nit: `SSL` => `Ssl` ## File path: core/src/main/scala/kafka/server/ConfigHandler.scala ## @@ -203,7 +203,13 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig, if (brokerId == ConfigEntityName.Default) brokerConfig.dynamicConfig.updateDefaultConfig(properties) else if (brokerConfig.brokerId == brokerId.trim.toInt) { - brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties) + val persistentProps = brokerConfig.dynamicConfig.fromPersistentProps(properties, perBrokerConfig = true) + // The filepath was changed for equivalent replacement, which means we should reload + if (brokerConfig.dynamicConfig.trimSSLStorePaths(persistentProps)) { + brokerConfig.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(persistentProps) + } Review comment: Can't we put this logic in `DynamicBrokerConfig`? ## File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala ## @@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } } + private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={ +val processedFiles = new mutable.HashSet[String] +reconfigurables + .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains)) +.foreach({ + case reconfigurable: ListenerReconfigurable => +ReloadableFileConfigs.foreach(configName => { + val prefixedName = reconfigurable.listenerName.configPrefix + configName + if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName) && + configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName, ""))) { +val equivalentFileName = configProps.getProperty(prefixedName).replace("/", "//") +configProps.setProperty(prefixedName, equivalentFileName) +processedFiles.add(prefixedName) + } +}) +}) + } + + private[server] def trimSSLStorePaths(configProps: Properties): Boolean = { Review comment: `SSL` => `Ssl` ## File path:
[jira] [Created] (KAFKA-10516) Implement Topic Command changes
David Jacot created KAFKA-10516: --- Summary: Implement Topic Command changes Key: KAFKA-10516 URL: https://issues.apache.org/jira/browse/KAFKA-10516 Project: Kafka Issue Type: Sub-task Reporter: David Jacot Assignee: David Jacot -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure
C0urante commented on pull request #8844: URL: https://github.com/apache/kafka/pull/8844#issuecomment-697324142 Hi @michael-carter-instaclustr--unfortunately, it can take some time. Usually I tag @rhauch and @kkonstantine on GitHub since they're the committers that work most closely with the Connect framework, but I see you've already done that. You could try reaching out to the dev mailing list and asking for review there? This is a good bug fix and deserves a look; ideally, it shouldn't take months to get something like this reviewed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10484) Reduce Metrics Exposed by Streams
[ https://issues.apache.org/jira/browse/KAFKA-10484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna reassigned KAFKA-10484: - Assignee: Bruno Cadonna > Reduce Metrics Exposed by Streams > - > > Key: KAFKA-10484 > URL: https://issues.apache.org/jira/browse/KAFKA-10484 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.6.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > > In our test cluster metrics are monitored through a monitoring service. We > experienced a couple of times that a Kafka Streams client exceeded the limit > of 350 metrics of the monitoring service. When the client exceeds the limit, > metrics will be truncated which might result in false alerts. For example, in > our cluster, we monitor the alive stream threads and trigger an alert if a > stream thread dies. It happened that when the client exceeded the 350 metrics > limit, the alive stream threads metric was truncated which led to a false > alarm. > The main driver of the high number of metrics are the metrics on task level > and below. An example for those metrics are the state store metrics. The > number of such metrics per Kafka Streams client is hard to predict since it > depends on which tasks are assigned to the client. A stateful task with 5 > state stores reports 5 times more state store metrics than a stateful with > only one state store. Sometimes it is possible to only report the metrics of > some state stores. But sometimes this is not an option. For example, if we > want to monitor the memory usage of RocksDB per Kafka Streams client, we need > to report the memory related metrics of all RocksDB state stores of all tasks > assigned to all stream threads of one client. > One option to reduce the reported metrics is to add a metric that aggregates > some state store metrics, e.g., to monitor memory usage, on client-level > within Kafka Streams. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] nizhikov commented on pull request #9196: [DO NOT MERGE] KAFKA-10402: Upgrade system tests to python3
nizhikov commented on pull request #9196: URL: https://github.com/apache/kafka/pull/9196#issuecomment-697311780 @edenhill > The IGNORED tests seems to be mostly kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_upgrade_downgrade_brokers.. Are they ignored in the trunk? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ankit-kumar-25 opened a new pull request #9326: KAFKA-10460: ReplicaListValidator format checking is incomplete
ankit-kumar-25 opened a new pull request #9326: URL: https://github.com/apache/kafka/pull/9326 What? :: See https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220 . The logic is supposed to accept only two cases: list of k:v pairs a single '*' But in practice, since the disjunction's second part only checks that the head is '*', the case where a k:v list is headed by a star is also accepted (and then later broker dies at startup, refusing the value). JIRA: https://issues.apache.org/jira/browse/KAFKA-10460 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #9196: [DO NOT MERGE] KAFKA-10402: Upgrade system tests to python3
nizhikov commented on pull request #9196: URL: https://github.com/apache/kafka/pull/9196#issuecomment-697311299 > kafkatest.tests.streams.streams_static_membership_test.StreamsStaticMembershipTest.test_rolling_bounces_will_not_trigger_rebalance_under_static_membership > kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_version_probing_upgrade These two tests are fixed by #9312 AFAIK two others are failing in the trunk too. Is it correct? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] edenhill commented on pull request #9196: [DO NOT MERGE] KAFKA-10402: Upgrade system tests to python3
edenhill commented on pull request #9196: URL: https://github.com/apache/kafka/pull/9196#issuecomment-697307919 Excerpt from the Jenkins test run: ``` SESSION REPORT (ALL TESTS) ducktape version: 0.7.9 session_id: 2020-09-20--001 run time: 400 minutes 59.682 seconds tests run:606 passed: 432 failed: 4 ignored: 170 . test_id: kafkatest.tests.streams.streams_named_repartition_topic_test.StreamsNamedRepartitionTopicTest.test_upgrade_topology_with_named_repartition_topic status: FAIL run time: 1 minute 59.478 seconds Server connection dropped: Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.9-py2.7.egg/ducktape/tests/runner_client.py", line 134, in run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.9-py2.7.egg/ducktape/tests/runner_client.py", line 192, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py", line 77, in test_upgrade_topology_with_named_repartition_topic verify_running(processor, 'UPDATED Topology') File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/utils/util.py", line 18, in verify_running processor.start() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.9-py2.7.egg/ducktape/services/service.py", line 234, in start self.start_node(node) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/streams.py", line 308, in start_node node.account.create_file(self.CONFIG_FILE, prop_file) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.9-py2.7.egg/ducktape/cluster/remoteaccount.py", line 588, in create_file with self.sftp_client.open(path, "w") as f: File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py", line 372, in open t, msg = self._request(CMD_OPEN, filename, imode, attrblock) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py", line 813, in _request return self._read_response(num) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py", line 845, in _read_response raise SSHException("Server connection dropped: {}".format(e)) SSHException: Server connection dropped: .. test_id: kafkatest.tests.streams.streams_static_membership_test.StreamsStaticMembershipTest.test_rolling_bounces_will_not_trigger_rebalance_under_static_membership status: FAIL run time: 2 minutes 2.957 seconds invalid literal for int() with base 10: "Generation{generationId=5,memberId='consumer-A-3-bf3848e8-0d61-4637-81d0-de9f42c95ddf',protocol='stream'} " Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.9-py2.7.egg/ducktape/tests/runner_client.py", line 134, in run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.9-py2.7.egg/ducktape/tests/runner_client.py", line 192, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_static_membership_test.py", line 86, in test_rolling_bounces_will_not_trigger_rebalance_under_static_membership generation = int(generation) ValueError: invalid literal for int() with base 10: "Generation{generationId=5,memberId='consumer-A-3-bf3848e8-0d61-4637-81d0-de9f42c95ddf',protocol='stream'} " ... test_id: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_version_probing_upgrade status: FAIL run time: 1 minute 22.265 seconds invalid literal for int() with base 10: "Generation{generationId=7,memberId='StreamsUpgradeTest-0cf34158-7f97-4699-8952-acec9013c8e0-StreamThread-1-consumer-5d625105-c972-4545-9967-b85e14274d0d',protocol='stream'} " Traceback (most recent call last): File
[GitHub] [kafka] rajinisivaram merged pull request #9301: KAFKA-10482: Fix flaky testDynamicListenerConnectionCreationRateQuota
rajinisivaram merged pull request #9301: URL: https://github.com/apache/kafka/pull/9301 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8360) Docs do not mention RequestQueueSize JMX metric
[ https://issues.apache.org/jira/browse/KAFKA-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17200749#comment-17200749 ] ASF GitHub Bot commented on KAFKA-8360: --- ankit-kumar-25 edited a comment on pull request #220: URL: https://github.com/apache/kafka-site/pull/220#issuecomment-696188603 Hey @viktorsomogyi, Thank you for the pointers, I have created a PR against the ops.html available in the Kafka project: https://github.com/apache/kafka/pull/9325 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Docs do not mention RequestQueueSize JMX metric > --- > > Key: KAFKA-8360 > URL: https://issues.apache.org/jira/browse/KAFKA-8360 > Project: Kafka > Issue Type: Improvement > Components: documentation, metrics, network >Reporter: Charles Francis Larrieu Casias >Assignee: Ankit Kumar >Priority: Major > Labels: documentation > > In the [monitoring > documentation|[https://kafka.apache.org/documentation/#monitoring],] there is > no mention of the `kafka.network:type=RequestChannel,name=RequestQueueSize` > JMX metric. This is an important metric because it can indicate that there > are too many requests in queue and suggest either increasing > `queued.max.requests` (along with perhaps memory), or increasing > `num.io.threads`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8360) Docs do not mention RequestQueueSize JMX metric
[ https://issues.apache.org/jira/browse/KAFKA-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17200748#comment-17200748 ] ASF GitHub Bot commented on KAFKA-8360: --- ankit-kumar-25 closed pull request #220: URL: https://github.com/apache/kafka-site/pull/220 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Docs do not mention RequestQueueSize JMX metric > --- > > Key: KAFKA-8360 > URL: https://issues.apache.org/jira/browse/KAFKA-8360 > Project: Kafka > Issue Type: Improvement > Components: documentation, metrics, network >Reporter: Charles Francis Larrieu Casias >Assignee: Ankit Kumar >Priority: Major > Labels: documentation > > In the [monitoring > documentation|[https://kafka.apache.org/documentation/#monitoring],] there is > no mention of the `kafka.network:type=RequestChannel,name=RequestQueueSize` > JMX metric. This is an important metric because it can indicate that there > are too many requests in queue and suggest either increasing > `queued.max.requests` (along with perhaps memory), or increasing > `num.io.threads`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ankit-kumar-25 opened a new pull request #9325: KAFKA-8360: Docs do not mention RequestQueueSize JMX metric
ankit-kumar-25 opened a new pull request #9325: URL: https://github.com/apache/kafka/pull/9325 What? :: Mentioning "Request Queue Size" under Monitoring tab. RequestQueueSize is an important metric to monitor the number of requests in the queue. As a crowded queue might face issue processing incoming or outgoing requests @viktorsomogyi Can you please review this? Thanks!! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #9196: [DO NOT MERGE] KAFKA-10402: Upgrade system tests to python3
nizhikov commented on pull request #9196: URL: https://github.com/apache/kafka/pull/9196#issuecomment-697300763 Hello, @guozhangwang Do you have system tests run results? Can you, please, show me it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ankit-kumar-25 closed pull request #9314: KAFKA-8360: Docs do not mention RequestQueueSize JMX metric
ankit-kumar-25 closed pull request #9314: URL: https://github.com/apache/kafka/pull/9314 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #9324: MINOR: Install "iproute2" explicitly in Dockerfile
chia7712 opened a new pull request #9324: URL: https://github.com/apache/kafka/pull/9324 this patch is similar to https://github.com/apache/kafka/commit/ee68b999c49cbbf514940a81282ff894e6cf50d9 the tool "iproute2" is required by ```round_trip_fault_test.py``` and it is not in openjdk:11 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9323: KAFKA-10514: Fix unit test for state directory cleanup
cadonna commented on pull request #9323: URL: https://github.com/apache/kafka/pull/9323#issuecomment-697269502 Call for review: @chia7712 @vvcephei @mjsax @guozhangwang @abbccdda This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request #9323: KAFKA-10514: Fix unit test for state directory cleanup
cadonna opened a new pull request #9323: URL: https://github.com/apache/kafka/pull/9323 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10515) NPE: Foreign key join serde may not be initialized with default serde if application is distributed
Thorsten Hake created KAFKA-10515: - Summary: NPE: Foreign key join serde may not be initialized with default serde if application is distributed Key: KAFKA-10515 URL: https://issues.apache.org/jira/browse/KAFKA-10515 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.5.1, 2.6.0 Reporter: Thorsten Hake The fix of KAFKA-9517 fixed the initialization of the foreign key joins serdes for KStream applications that do not run distributed over multiple instances. However, if an application runs distributed over multiple instances, the foreign key join serdes may still not be initialized leading to the following NPE: {noformat} Encountered the following error during processing:java.lang.NullPointerException: null at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85) at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52) at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59) at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50) at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27) at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487) at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102) at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383) at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670){noformat} This happens because the processors for foreign key joins will be distributed across multiple tasks. The serde will only be initialized with the default serde during the initialization of the task containing the sink node ("subscription-registration-sink"). So if the task containing the SubscriptionStoreReceiveProcessor ("subscription-receive") is not assigned to the same instance as the task containing the sink node, a NPE will be thrown because the Serde of the state store used within the SubscriptionStoreReceiveProcessor is not initialized. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function
chia7712 commented on pull request #9162: URL: https://github.com/apache/kafka/pull/9162#issuecomment-697223860 ``` Build / JDK 8 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] ``` It is flaky on my local so it should be unrelated to this patch. @ijuma Could you take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically
tombentley commented on pull request #9266: URL: https://github.com/apache/kafka/pull/9266#issuecomment-697220456 @ijuma sorry about that. Now fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10514) failed test StateDirectoryTest.shouldLogStateDirCleanerMessage
[ https://issues.apache.org/jira/browse/KAFKA-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17200647#comment-17200647 ] Bruno Cadonna commented on KAFKA-10514: --- Thank you for catching this! This test seems to be flaky for the reason, you pointed out. I will submit a PR shortly! > failed test StateDirectoryTest.shouldLogStateDirCleanerMessage > -- > > Key: KAFKA-10514 > URL: https://issues.apache.org/jira/browse/KAFKA-10514 > Project: Kafka > Issue Type: Bug > Components: unit tests >Reporter: Chia-Ping Tsai >Assignee: Bruno Cadonna >Priority: Major > Labels: flaky > > {quote} > java.lang.AssertionError: > Expected: a collection containing a string ending with "ms has elapsed > (cleanup delay is 0ms)." > but: was empty > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldLogStateDirCleanerMessage(StateDirectoryTest.java:569) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at jdk.internal.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119) > at jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) >
[jira] [Assigned] (KAFKA-10514) failed test StateDirectoryTest.shouldLogStateDirCleanerMessage
[ https://issues.apache.org/jira/browse/KAFKA-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna reassigned KAFKA-10514: - Assignee: Bruno Cadonna > failed test StateDirectoryTest.shouldLogStateDirCleanerMessage > -- > > Key: KAFKA-10514 > URL: https://issues.apache.org/jira/browse/KAFKA-10514 > Project: Kafka > Issue Type: Bug > Components: unit tests >Reporter: Chia-Ping Tsai >Assignee: Bruno Cadonna >Priority: Major > Labels: flaky > > {quote} > java.lang.AssertionError: > Expected: a collection containing a string ending with "ms has elapsed > (cleanup delay is 0ms)." > but: was empty > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldLogStateDirCleanerMessage(StateDirectoryTest.java:569) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at jdk.internal.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119) > at jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at >
[jira] [Commented] (KAFKA-10514) failed test StateDirectoryTest.shouldLogStateDirCleanerMessage
[ https://issues.apache.org/jira/browse/KAFKA-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17200617#comment-17200617 ] Chia-Ping Tsai commented on KAFKA-10514: more details: https://github.com/apache/kafka/pull/9262#issuecomment-697177819 > failed test StateDirectoryTest.shouldLogStateDirCleanerMessage > -- > > Key: KAFKA-10514 > URL: https://issues.apache.org/jira/browse/KAFKA-10514 > Project: Kafka > Issue Type: Bug > Components: unit tests >Reporter: Chia-Ping Tsai >Priority: Major > Labels: flaky > > {quote} > java.lang.AssertionError: > Expected: a collection containing a string ending with "ms has elapsed > (cleanup delay is 0ms)." > but: was empty > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldLogStateDirCleanerMessage(StateDirectoryTest.java:569) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at jdk.internal.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119) > at jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at >
[GitHub] [kafka] chia7712 commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually
chia7712 commented on pull request #9262: URL: https://github.com/apache/kafka/pull/9262#issuecomment-697177819 @cadonna ``` org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldLogStateDirCleanerMessage failed, log available in /home/chia7712/kafka/streams/build/reports/testOutput/org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldLogStateDirCleanerMessage.test.stdout org.apache.kafka.streams.processor.internals.StateDirectoryTest > shouldLogStateDirCleanerMessage FAILED java.lang.AssertionError: Expected: a collection containing a string ending with "ms has elapsed (cleanup delay is 0ms)." but: was empty at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldLogStateDirCleanerMessage(StateDirectoryTest.java:569) ``` the new test introduced by this PR is failed. The failed test does not update the current time of ```MockTime``` so the state directory can't be cleanup. I have filed a ticket for that failed test (https://issues.apache.org/jira/browse/KAFKA-10514). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10514) failed test StateDirectoryTest.shouldLogStateDirCleanerMessage
[ https://issues.apache.org/jira/browse/KAFKA-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10514: --- Summary: failed test StateDirectoryTest.shouldLogStateDirCleanerMessage (was: flaky test StateDirectoryTest.shouldLogStateDirCleanerMessage) > failed test StateDirectoryTest.shouldLogStateDirCleanerMessage > -- > > Key: KAFKA-10514 > URL: https://issues.apache.org/jira/browse/KAFKA-10514 > Project: Kafka > Issue Type: Bug > Components: unit tests >Reporter: Chia-Ping Tsai >Priority: Major > Labels: flaky > > {quote} > java.lang.AssertionError: > Expected: a collection containing a string ending with "ms has elapsed > (cleanup delay is 0ms)." > but: was empty > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldLogStateDirCleanerMessage(StateDirectoryTest.java:569) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at jdk.internal.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119) > at jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at >
[GitHub] [kafka] chia7712 commented on pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …
chia7712 commented on pull request #9206: URL: https://github.com/apache/kafka/pull/9206#issuecomment-697164367 ``` Build / JDK 15 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota ``` it is traced by #9301 ``` org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldLogStateDirCleanerMessage ``` https://issues.apache.org/jira/browse/KAFKA-10514 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10514) flaky test StateDirectoryTest.shouldLogStateDirCleanerMessage
[ https://issues.apache.org/jira/browse/KAFKA-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10514: --- Labels: flaky (was: ) > flaky test StateDirectoryTest.shouldLogStateDirCleanerMessage > - > > Key: KAFKA-10514 > URL: https://issues.apache.org/jira/browse/KAFKA-10514 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Priority: Major > Labels: flaky > > {quote} > java.lang.AssertionError: > Expected: a collection containing a string ending with "ms has elapsed > (cleanup delay is 0ms)." > but: was empty > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldLogStateDirCleanerMessage(StateDirectoryTest.java:569) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at jdk.internal.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119) > at jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at
[jira] [Created] (KAFKA-10514) flaky test StateDirectoryTest.shouldLogStateDirCleanerMessage
Chia-Ping Tsai created KAFKA-10514: -- Summary: flaky test StateDirectoryTest.shouldLogStateDirCleanerMessage Key: KAFKA-10514 URL: https://issues.apache.org/jira/browse/KAFKA-10514 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai {quote} java.lang.AssertionError: Expected: a collection containing a string ending with "ms has elapsed (cleanup delay is 0ms)." but: was empty at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldLogStateDirCleanerMessage(StateDirectoryTest.java:569) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at jdk.internal.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119) at jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164) at
[jira] [Updated] (KAFKA-10514) flaky test StateDirectoryTest.shouldLogStateDirCleanerMessage
[ https://issues.apache.org/jira/browse/KAFKA-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10514: --- Component/s: unit tests > flaky test StateDirectoryTest.shouldLogStateDirCleanerMessage > - > > Key: KAFKA-10514 > URL: https://issues.apache.org/jira/browse/KAFKA-10514 > Project: Kafka > Issue Type: Bug > Components: unit tests >Reporter: Chia-Ping Tsai >Priority: Major > Labels: flaky > > {quote} > java.lang.AssertionError: > Expected: a collection containing a string ending with "ms has elapsed > (cleanup delay is 0ms)." > but: was empty > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldLogStateDirCleanerMessage(StateDirectoryTest.java:569) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at jdk.internal.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119) > at jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at >