[jira] [Updated] (KAFKA-16250) Consumer group coordinator should perform sanity check on the offset commits.
[ https://issues.apache.org/jira/browse/KAFKA-16250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu updated KAFKA-16250: --- Description: The current coordinator does not validate the offset commits before persisting it in the record. In a real case, though, I am not sure why the consumer generates the offset commits with a consumer offset valued at -2, the "illegal" consumer offset value caused confusion with the admin cli when describing the consumer group. The consumer offset field is marked "". was: The current coordinator does not validate the offset commits before persisting it in the record. In a real case, though, I am not sure why the consumer generates the offset commits with a consumer offset valued at -2, the "illegal" consumer offset value caused confusion with the admin cli when describing the consumer group. The consumer offset field is marked "-". > Consumer group coordinator should perform sanity check on the offset commits. > - > > Key: KAFKA-16250 > URL: https://issues.apache.org/jira/browse/KAFKA-16250 > Project: Kafka > Issue Type: Improvement >Reporter: Calvin Liu >Priority: Major > > The current coordinator does not validate the offset commits before > persisting it in the record. > In a real case, though, I am not sure why the consumer generates the offset > commits with a consumer offset valued at -2, the "illegal" consumer offset > value caused confusion with the admin cli when describing the consumer group. > The consumer offset field is marked "". > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Kafka-16668: Add tags support in ClusterTestExtension [kafka]
johnnychhsu commented on code in PR #15861: URL: https://github.com/apache/kafka/pull/15861#discussion_r1592759299 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -153,15 +148,19 @@ public Map> perBrokerOverrideProperties() { return perBrokerOverrideProperties; } -public Map nameTags() { -Map tags = new LinkedHashMap<>(4); -name().ifPresent(name -> tags.put("Name", name)); -tags.put("MetadataVersion", metadataVersion.toString()); -tags.put("Security", securityProtocol.name()); -listenerName().ifPresent(listener -> tags.put("Listener", listener)); +public String[] tags() { return tags; } +public Map nameTags() { Review Comment: since we can get the tags from the getter, why do we still need to add this into the displayTags? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16656) Using a custom replication.policy.separator with DefaultReplicationPolicy
[ https://issues.apache.org/jira/browse/KAFKA-16656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844368#comment-17844368 ] Chris Egerton commented on KAFKA-16656: --- Hi [~leninjoseph] it should be possible to use a custom separator with the {{DefaultReplicationPolicy}} class. What are the names of the topics you're seeing cyclical replication for? I've sketched out a unit test that can be added to the [ReplicationPolicyTest suite|https://github.com/apache/kafka/blob/05df10449eb9c95fe6d6055b302c84686be8058d/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java] that probes how the class handles custom separators and everything appears to be working, but this doesn't rule out the possibility of bugs in other places: {{@Test}} {{public void testCustomSeparator() {}} {{ DefaultReplicationPolicy policyWithCustomSeparator = new DefaultReplicationPolicy();}} {{ Map config = new HashMap<>();}} {{ config.put(DefaultReplicationPolicy.SEPARATOR_CONFIG, "-");}} {{ policyWithCustomSeparator.configure(config);}} {{ assertEquals("source", policyWithCustomSeparator.topicSource("source-t"));}} {{ assertEquals("t", policyWithCustomSeparator.upstreamTopic("source-t"));}} {{}}} > Using a custom replication.policy.separator with DefaultReplicationPolicy > - > > Key: KAFKA-16656 > URL: https://issues.apache.org/jira/browse/KAFKA-16656 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.5.1 >Reporter: Lenin Joseph >Priority: Major > > Hi, > In the case of bidirectional replication using mm2, when we tried using a > custom replication.policy.separator( ex: "-") with DefaultReplicationPolicy , > we see cyclic replication of topics. Could you confirm whether it's mandatory > to use a CustomReplicationPolicy whenever we want to use a separator other > than a "." ? > Regards, > Lenin -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
viktorsomogyi commented on PR #15697: URL: https://github.com/apache/kafka/pull/15697#issuecomment-2098821932 @soarez at the end I chose the shortcut regarding detecting leaders before shutdown. The reason is complex as the solution that would be required for this is complex too. So on one part the sequence of events is problematic. First we update the `LogManager` and then try to propagate the event to the controller. At this point the metadata is stale so I can't use that for reliable information to detect whether partitions have leadership or not. A workaround would be to subtract the LogManager's data from metadata cache (ie. if there is only a single isr replica and that is the current, then we can accept it as offline in reality). I don't really feel that it is a robust solution, it could be prone to race conditions on the network depending on how requests come from the controller as long as it's alive. I think it's more robust to just fail if we can't contact the controller. The second reason is a bit technical and can be worked around, although requires lots of effort. When trying to extract which replica->logdir information from `LogManager`, my only available information regarding logdirs given by the event is the `Uuid`. Unfortunately `LogManager` doesn't store the `Uuid` of an offline dir (and besides I don't think `Uuid` and logdir names used consistently across the whole module). This problem can be solved by propagating both logdir and `Uuid` in the events or store offline dirs' `Uuid ` in `LogManager`. I think the latter is problematic because we can't know the point until we should store information about offline dirs as they might never come back. The first can be done, although could be a sizeable refactor and generally I felt that just choosing the simpler route now could be more robust. Let me know if you think we should try it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16108) Backport fix for KAFKA-16093 to 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844365#comment-17844365 ] Igor Soarez commented on KAFKA-16108: - [~ChrisEgerton] is this still relevant? The priority field is set to blocker but the description says it's not a blocker, what's the correct priority? > Backport fix for KAFKA-16093 to 3.7 > --- > > Key: KAFKA-16108 > URL: https://issues.apache.org/jira/browse/KAFKA-16108 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > Fix For: 3.7.1 > > > A fix for KAFKA-16093 is present on the branches trunk (the version for which > is currently 3.8.0-SNAPSHOT) and 3.6. We are in code freeze for the 3.7.0 > release, and this issue is not a blocker, so it cannot be backported right > now. > We should backport the fix once 3.7.0 has been released and before 3.7.1 is > released. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
raminqaf commented on PR #15601: URL: https://github.com/apache/kafka/pull/15601#issuecomment-2098817942 @gharris1727 Thanks for the feedback! I reverted all the changes you requested and reverted a couple of other indentation problems that caused a diff. I can even go further and revert & inline the introduce private methods (i.e., `emitInnerJoin` or `putInOuterJoinStore`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16686) Flakey tests in TopicBasedRemoteLogMetadataManagerTest
Gaurav Narula created KAFKA-16686: - Summary: Flakey tests in TopicBasedRemoteLogMetadataManagerTest Key: KAFKA-16686 URL: https://issues.apache.org/jira/browse/KAFKA-16686 Project: Kafka Issue Type: Test Components: Tiered-Storage Reporter: Gaurav Narula Assignee: Gaurav Narula Tests in {{TopicBasedRemoteLogMetadataManagerTest}} flake because {{waitUntilConsumerCatchesUp}} may return before all expected metadata is caught up. Flakyness report [here|https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16287: Implement example tests for common rebalance callback [kafka]
cadonna commented on code in PR #15408: URL: https://github.com/apache/kafka/pull/15408#discussion_r1592700226 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala: ## @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package integration.kafka.api + +import kafka.api.{AbstractConsumerTest, BaseConsumerTest} +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRebalanceListener} +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{Arguments, MethodSource} + +import java.util +import java.util.Arrays.asList +import java.util.Collections +import java.util.concurrent.atomic.AtomicBoolean +import java.util.stream.Stream + +/** + * Integration tests for the consumer that cover interaction with the consumer from within callbacks + * and listeners. + */ +class PlaintextConsumerCallbackTest extends AbstractConsumerTest { + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumerRebalanceListenerAssignOnPartitionsAssigned(quorum: String, groupProtocol: String): Unit = { +val tp = new TopicPartition(topic, 0); +triggerOnPartitionsAssigned { (consumer, _) => + val e: Exception = assertThrows(classOf[IllegalStateException], () => consumer.assign(Collections.singletonList(tp))) + assertEquals(e.getMessage, "Subscription to topics, partitions and pattern are mutually exclusive") Review Comment: Are you proposing to change the message of the exception that is verified here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: enable test for ensureInternalEndpointIsSecured [kafka]
FrankYang0529 commented on code in PR #15868: URL: https://github.com/apache/kafka/pull/15868#discussion_r1592700292 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java: ## @@ -115,11 +112,11 @@ public void ensureInternalEndpointIsSecured() throws Throwable { // Try again, but with an invalid signature log.info( "Making a POST request to the {} endpoint with no connector started and an invalid signature header; " -+ "expecting 403 error response", ++ "expecting 503 error response", connectorTasksEndpoint ); assertEquals( -FORBIDDEN.getStatusCode(), +SERVICE_UNAVAILABLE.getStatusCode(), Review Comment: This was added in https://github.com/apache/kafka/pull/11783. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: enable test for ensureInternalEndpointIsSecured [kafka]
FrankYang0529 commented on code in PR #15868: URL: https://github.com/apache/kafka/pull/15868#discussion_r1592697934 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java: ## @@ -115,11 +112,11 @@ public void ensureInternalEndpointIsSecured() throws Throwable { // Try again, but with an invalid signature log.info( "Making a POST request to the {} endpoint with no connector started and an invalid signature header; " -+ "expecting 403 error response", ++ "expecting 503 error response", connectorTasksEndpoint ); assertEquals( -FORBIDDEN.getStatusCode(), +SERVICE_UNAVAILABLE.getStatusCode(), Review Comment: Here, if the connector is not started, there is no `sessionKey`. https://github.com/apache/kafka/blob/05df10449eb9c95fe6d6055b302c84686be8058d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2748-L2752 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16681) Rewrite MiniKDC by Java
[ https://issues.apache.org/jira/browse/KAFKA-16681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16681: -- Assignee: PoAn Yang (was: Chia-Ping Tsai) > Rewrite MiniKDC by Java > --- > > Key: KAFKA-16681 > URL: https://issues.apache.org/jira/browse/KAFKA-16681 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Major > > Noted: > # we need to move it from scala folder to java folder > # don't change the package name since system tests requires it -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13329) Connect does not perform preflight validation for per-connector key and value converters
[ https://issues.apache.org/jira/browse/KAFKA-13329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-13329. --- Fix Version/s: 3.8.0 Resolution: Fixed > Connect does not perform preflight validation for per-connector key and value > converters > > > Key: KAFKA-13329 > URL: https://issues.apache.org/jira/browse/KAFKA-13329 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 3.8.0 > > > Users may specify a key and/or value converter class for their connector > directly in the configuration for that connector. If this occurs, no > preflight validation is performed to ensure that the specified converter is > valid. > Unfortunately, the [Converter > interface|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java] > does not require converters to expose a {{ConfigDef}} (unlike the > [HeaderConverter > interface|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L48-L52], > which does have that requirement), so it's unlikely that the configuration > properties of the converter itself can be validated. > However, we can and should still validate that the converter class exists, > can be instantiated (i.e., has a public, no-args constructor and is a > concrete, non-abstract class), and implements the {{Converter}} interface. > *EDIT:* Since this ticket was originally filed, a {{Converter::config}} > method was added in > [KIP-769|https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+APIs+to+list+all+connector+plugins+and+retrieve+their+configuration+definitions]. > We can now utilize that config definition during preflight validation for > connectors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13328) Connect does not perform preflight validation for per-connector header converters
[ https://issues.apache.org/jira/browse/KAFKA-13328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-13328. --- Fix Version/s: 3.8.0 Resolution: Fixed > Connect does not perform preflight validation for per-connector header > converters > - > > Key: KAFKA-13328 > URL: https://issues.apache.org/jira/browse/KAFKA-13328 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 3.8.0 > > > Users may specify a header converter class for their connector directly in > the configuration for that connector. If this occurs, no preflight validation > is performed to ensure that the specified converter is valid. > {{HeaderConverter}} implementations are required to provide a valid > {{ConfigDef}} to the Connect framework via > [HeaderConverter::config|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L48-L52], > but this object isn't actually leveraged anywhere by Connect. > Connect should make use of this config object during preflight validation for > connectors to fail faster when their header converters are misconfigured. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16681) Rewrite MiniKDC by Java
[ https://issues.apache.org/jira/browse/KAFKA-16681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844345#comment-17844345 ] PoAn Yang commented on KAFKA-16681: --- Hi [~chia7712], I'm interested in this. If you are not working on it, may I assign to myself? Thank you. > Rewrite MiniKDC by Java > --- > > Key: KAFKA-16681 > URL: https://issues.apache.org/jira/browse/KAFKA-16681 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > Noted: > # we need to move it from scala folder to java folder > # don't change the package name since system tests requires it -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]
C0urante merged PR #14309: URL: https://github.com/apache/kafka/pull/14309 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]
jolshan commented on PR #15837: URL: https://github.com/apache/kafka/pull/15837#issuecomment-2098727340 Sorry I was out of town (at KSB). I will try to take a look today, but thanks Luke for approving as well :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]
C0urante commented on PR #14309: URL: https://github.com/apache/kafka/pull/14309#issuecomment-2098726331 Thanks @gharris1727. I've reverted the changes to `Utils.java` and verified locally with tests. Everything else looked okay on the previous CI run, going to merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
gharris1727 commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1591293880 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -184,20 +146,34 @@ public void process(final Record record) { // This condition below allows us to process the out-of-order records without the need // to hold it in the temporary outer store if (!outerJoinStore.isPresent() || timeTo < sharedTimeTracker.streamTime) { - context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); +context().forward( +record.withValue(joiner.apply(record.key(), record.value(), null))); Review Comment: nit: revert ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -43,95 +42,73 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; -class KStreamKStreamJoin implements ProcessorSupplier { +abstract class KStreamKStreamJoin implements ProcessorSupplier { private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); - -private final String otherWindowName; +private final boolean outer; +private final ValueJoinerWithKey joiner; private final long joinBeforeMs; private final long joinAfterMs; private final long joinGraceMs; +private final String otherWindowName; +private final TimeTrackerSupplier sharedTimeTrackerSupplier; private final boolean enableSpuriousResultFix; +private final Optional outerJoinWindowName; private final long windowsBeforeMs; private final long windowsAfterMs; -private final boolean outer; -private final boolean isLeftSide; -private final Optional outerJoinWindowName; -private final ValueJoinerWithKey joiner; - -private final TimeTrackerSupplier sharedTimeTrackerSupplier; - -KStreamKStreamJoin(final boolean isLeftSide, - final String otherWindowName, - final JoinWindowsInternal windows, - final ValueJoinerWithKey joiner, - final boolean outer, - final Optional outerJoinWindowName, - final TimeTrackerSupplier sharedTimeTrackerSupplier) { -this.isLeftSide = isLeftSide; +KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier sharedTimeTrackerSupplier, + final Optional outerJoinWindowName, final long joinBeforeMs, + final long joinAfterMs, final JoinWindowsInternal windows, final boolean outer, + final ValueJoinerWithKey joiner) { Review Comment: Revert the re-ordering here that isn't necessary, so it's easy to see the signature change for `joiner` and the removal of `isLeftSide`. We should also put each constructor argument on it's own to follow the previous style. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -43,95 +42,73 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; -class KStreamKStreamJoin implements ProcessorSupplier { +abstract class KStreamKStreamJoin implements ProcessorSupplier { private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); - -private final String otherWindowName; +private final boolean outer; +private final ValueJoinerWithKey joiner; private final long joinBeforeMs; private final long joinAfterMs; private final long joinGraceMs; +private final String otherWindowName; +private final TimeTrackerSupplier sharedTimeTrackerSupplier; private final boolean enableSpuriousResultFix; +private final Optional outerJoinWindowName; private final long windowsBeforeMs; private final long windowsAfterMs; -private final boolean outer; -private final boolean isLeftSide; -private final Optional outerJoinWindowName; -private final ValueJoinerWithKey joiner; - -private final TimeTrackerSupplier sharedTimeTrackerSupplier; - -KStreamKStreamJoin(final boolean isLeftSide, - final String otherWindowName, - final JoinWindowsInternal windows, - final ValueJoinerWithKey joiner, - final boolean outer, - final Optional outerJoinWindowName, - final TimeTrackerSupplier
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15821: URL: https://github.com/apache/kafka/pull/15821#issuecomment-2098714976 Hi @chia7712, I rebase latest trunk branch, so we have `ConsumerGroupCommandTestUtils` now. The only remaining part is `SimpleConsumerGroupExecutor`. This is used by `ListConsumerGroupTest` and `DescribeConsumerGroupTest`. I think we can create a new class `SimpleConsumerGroupExecutorTestUtils` for it. WDYT? Thank you. https://github.com/apache/kafka/blob/21bf715622e9d05984fa8a2a1f9f12d54b76ce41/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java#L327-L335 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]
C0urante commented on code in PR #14309: URL: https://github.com/apache/kafka/pull/14309#discussion_r1592678706 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -392,6 +399,146 @@ protected Map validateSourceConnectorConfig(SourceConnector return configDef.validateAll(config); } +/** + * General-purpose validation logic for converters that are configured directly + * in a connector config (as opposed to inherited from the worker config). + * @param connectorConfig the configuration for the connector; may not be null + * @param pluginConfigValue the {@link ConfigValue} for the converter property in the connector config; + * may be null, in which case no validation will be performed under the assumption that the + * connector will use inherit the converter settings from the worker + * @param pluginInterface the interface for the plugin type + *(e.g., {@code org.apache.kafka.connect.storage.Converter.class}); + *may not be null + * @param configDefAccessor an accessor that can be used to retrieve a {@link ConfigDef} + * from an instance of the plugin type (e.g., {@code Converter::config}); + * may not be null + * @param pluginName a lowercase, human-readable name for the type of plugin (e.g., {@code "key converter"}); + * may not be null + * @param pluginProperty the property used to define a custom class for the plugin type + * in a connector config (e.g., {@link ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG}); + * may not be null + * @param defaultProperties any default properties to include in the configuration that will be used for + * the plugin; may be null + + * @return a {@link ConfigInfos} object containing validation results for the plugin in the connector config, + * or null if no custom validation was performed (possibly because no custom plugin was defined in the connector + * config) + + * @param the plugin class to perform validation for + */ +private ConfigInfos validateConverterConfig( +Map connectorConfig, +ConfigValue pluginConfigValue, +Class pluginInterface, +Function configDefAccessor, +String pluginName, +String pluginProperty, +Map defaultProperties +) { +Objects.requireNonNull(connectorConfig); +Objects.requireNonNull(pluginInterface); +Objects.requireNonNull(configDefAccessor); +Objects.requireNonNull(pluginName); +Objects.requireNonNull(pluginProperty); + +String pluginClass = connectorConfig.get(pluginProperty); + +if (pluginClass == null +|| pluginConfigValue == null +|| !pluginConfigValue.errorMessages().isEmpty() +) { +// Either no custom converter was specified, or one was specified but there's a problem with it. +// No need to proceed any further. +return null; +} + +T pluginInstance; +try { +pluginInstance = Utils.newInstance(pluginClass, pluginInterface); +} catch (ClassNotFoundException | RuntimeException e) { +log.error("Failed to instantiate {} class {}; this should have been caught by prior validation logic", pluginName, pluginClass, e); +pluginConfigValue.addErrorMessage("Failed to load class " + pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : "")); +return null; +} + +try { +ConfigDef configDef; +try { +configDef = configDefAccessor.apply(pluginInstance); +} catch (RuntimeException e) { +log.error("Failed to load ConfigDef from {} of type {}", pluginName, pluginClass, e); +pluginConfigValue.addErrorMessage("Failed to load ConfigDef from " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : "")); +return null; +} +if (configDef == null) { +log.warn("{}.config() has returned a null ConfigDef; no further preflight config validation for this converter will be performed", pluginClass); +// Older versions of Connect didn't do any converter validation. +// Even though converters are technically required to return a non-null ConfigDef object from their config() method, +// we permit this case in order to avoid breaking existing converters that, despite not adhering to this requirement, +// can be used successfully with a connector. +return null; +} +final String
Re: [PR] MINOR: use classic consumer with ZK mode for DeleteOffsetsConsumerGroupCommandIntegrationTest [kafka]
FrankYang0529 commented on code in PR #15872: URL: https://github.com/apache/kafka/pull/15872#discussion_r1592664550 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -88,7 +113,7 @@ public void testDeleteOffsetsNonExistingGroup() { } } -@ClusterTest +@ClusterTemplate("generator") public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() { for (Map consumerConfig: consumerConfigs) { Review Comment: Updated it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: enable test for ensureInternalEndpointIsSecured [kafka]
chia7712 commented on code in PR #15868: URL: https://github.com/apache/kafka/pull/15868#discussion_r1592649000 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java: ## @@ -115,11 +112,11 @@ public void ensureInternalEndpointIsSecured() throws Throwable { // Try again, but with an invalid signature log.info( "Making a POST request to the {} endpoint with no connector started and an invalid signature header; " -+ "expecting 403 error response", ++ "expecting 503 error response", connectorTasksEndpoint ); assertEquals( -FORBIDDEN.getStatusCode(), +SERVICE_UNAVAILABLE.getStatusCode(), Review Comment: Could you share the details with me? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: merge unit test down to the class of integration test [kafka]
KevinZTW opened a new pull request, #15884: URL: https://github.com/apache/kafka/pull/15884 Merge the unit test functions down to the class of integration test to avoid two test class under the same file ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16645) CVEs in 3.7.0 docker image
[ https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844334#comment-17844334 ] Igor Soarez commented on KAFKA-16645: - The vulnerability report flags {{libexpat}} version {{{}2.5.0-r2{}}}. Both {{apache/kafka:3.7.0}} and {{apache/kafka:latest}} ship with the library: {code:java} $ docker run --rm -it apache/kafka:3.7.0 -- apk list | grep libexpat libexpat-2.5.0-r2 aarch64 {expat} (MIT) [installed]~ak trunk ⇣ $ docker run --rm -it apache/kafka:latest -- apk list | grep libexpat libexpat-2.5.0-r2 aarch64 {expat} (MIT) [installed]{code} Neither Kafka nor its container image directly depend on {{{}libexpat{}}}. The library is instead bundled into the [base image {{eclipse-temurin:21-jre-alpine}}|https://github.com/apache/kafka/blob/21bf715622e9d05984fa8a2a1f9f12d54b76ce41/docker/jvm/Dockerfile#L44]. {code:java} $ docker run --rm -it eclipse-temurin:21-jre-alpine -- apk list | grep libexpat libexpat-2.6.2-r0 aarch64 {expat} (MIT) [installed] $ docker inspect eclipse-temurin:21-jre-alpine | jq -r '.[0].Created' 2024-04-23T20:51:38Z~ak/docker trunk ⇣ $ docker inspect apache/kafka:3.7.0 | jq -r '.[0].Created' 2024-02-09T14:51:42.808028351Z~ak/docker trunk ⇣ $ docker inspect apache/kafka:latest | jq -r '.[0].Created' 2024-02-09T14:51:42.808028351Z{code} The vulnerability has already been addressed in the base image, under the same image tag. To confirm, I ran the vulnerability scanner against a locally built image. {code:java} $ python docker_build_test.py kafka/test -tag=localkafkaimg -type=jvm -u=https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz (...) $ docker run -it -v /var/run/docker.sock:/var/run/docker.sock aquasec/trivy image kafka/test:localkafkaimg -s CRITICAL,HIGH 2024-05-07T14:57:18Z INFO Need to update DB 2024-05-07T14:57:18Z INFO Downloading DB... repository="ghcr.io/aquasecurity/trivy-db:2" 45.90 MiB / 45.90 MiB [---] 100.00% 20.98 MiB p/s 2.4s 2024-05-07T14:57:22Z INFO Vulnerability scanning is enabled 2024-05-07T14:57:22Z INFO Secret scanning is enabled 2024-05-07T14:57:22Z INFO If your scanning is slow, please try '--scanners vuln' to disable secret scanning 2024-05-07T14:57:22Z INFO Please see also https://aquasecurity.github.io/trivy/v0.51/docs/scanner/secret/#recommendation for faster secret detection 2024-05-07T14:57:23Z INFO Java DB Repository repository=ghcr.io/aquasecurity/trivy-java-db:1 2024-05-07T14:57:23Z INFO Downloading the Java DB... 606.06 MiB / 606.06 MiB [--] 100.00% 24.19 MiB p/s 25s 2024-05-07T14:57:49Z INFO The Java DB is cached for 3 days. If you want to update the database more frequently, the '--reset' flag clears the DB cache. 2024-05-07T14:57:49Z INFO Detected OS family="alpine" version="3.19.1" 2024-05-07T14:57:49Z INFO [alpine] Detecting vulnerabilities... os_version="3.19" repository="3.19" pkg_num=43 2024-05-07T14:57:49Z INFO Number of language-specific files num=1 2024-05-07T14:57:49Z INFO [jar] Detecting vulnerabilities... kafka/test:localkafkaimg (alpine 3.19.1) Total: 0 (HIGH: 0, CRITICAL: 0){code} I don't think we republish releases without a version change, so unless we want to create an exception for container images and republish the {{3.7.0}} and {{latest}} tags now, so I propose we take no action here as the next images built will not have these issues. Please let me know if you disagree. > CVEs in 3.7.0 docker image > -- > > Key: KAFKA-16645 > URL: https://issues.apache.org/jira/browse/KAFKA-16645 > Project: Kafka > Issue Type: Task >Affects Versions: 3.7.0 >Reporter: Mickael Maison >Priority: Blocker > Fix For: 3.8.0, 3.7.1 > > > Our [Docker Image CVE > Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub > action reports 2 high CVEs in our base image: > apache/kafka:3.7.0 (alpine 3.19.1) > == > Total: 2 (HIGH: 2, CRITICAL: 0) > ┌──┬┬──┬┬───┬───┬─┐ > │ Library │ Vulnerability │ Severity │ Status │ Installed Version │ Fixed > Version │Title│ > ├──┼┼──┼┼───┼───┼─┤ > │ libexpat │ CVE-2023-52425 │ HIGH │ fixed │ 2.5.0-r2 │ > 2.6.0-r0 │ expat: parsing large tokens can trigger a denial of
[PR] MINOR: Rewrite OptimizedUniformAssignmentBuilder#assignStickyPartitions to improve performance [kafka]
dajac opened a new pull request, #15883: URL: https://github.com/apache/kafka/pull/15883 This PR is still WIP. I am still playing with an alternative approach. Trunk: ``` Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionModel) (topicCount) Mode Cnt Score Error Units ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 1 10 HOMOGENEOUS 100 avgt5 27.636 ± 0.131 ms/op ``` Patch: ``` Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionModel) (topicCount) Mode Cnt Score Error Units ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 1 10 HOMOGENEOUS 100 avgt5 20.868 ± 0.320 ms/op ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]
chia7712 commented on code in PR #15863: URL: https://github.com/apache/kafka/pull/15863#discussion_r1592508554 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -101,6 +101,7 @@ class LogCleaner(initialConfig: CleanerConfig, time: Time = Time.SYSTEM) extends Logging with BrokerReconfigurable { // Visible for test. private[log] val metricsGroup = new KafkaMetricsGroup(this.getClass) + activateMetrics() Review Comment: As it gets called in `startup`, do we need to call it in construction? ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -118,6 +118,23 @@ class LogCleanerTest extends Logging { } } + @Test + def testMetricsActiveAfterReconfiguration(): Unit = { +val logCleaner = new LogCleaner(new CleanerConfig(true), + logDirs = Array(TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time) + +try { + logCleaner.reconfigure(new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181")), +new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181"))) + + LogCleaner.MetricNames.foreach(name => assertNotNull(KafkaYammerMetrics.defaultRegistry.allMetrics().get(logCleaner.metricsGroup Review Comment: We can use `MetricName#getName` to simplify the code. For example: ```scala val nonexistent = LogCleaner.MetricNames.diff(KafkaYammerMetrics.defaultRegistry.allMetrics().keySet().asScala.map(_.getName)) assertEquals(0, nonexistent.size, s"$nonexistent should be existent") ``` ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -118,6 +118,23 @@ class LogCleanerTest extends Logging { } } + @Test + def testMetricsActiveAfterReconfiguration(): Unit = { +val logCleaner = new LogCleaner(new CleanerConfig(true), + logDirs = Array(TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time) + Review Comment: Could you please add `startup` and then check the metrics get created? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]
chiacyu commented on code in PR #15863: URL: https://github.com/apache/kafka/pull/15863#discussion_r1592516715 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -159,6 +159,7 @@ class LogCleaner(initialConfig: CleanerConfig, cleaners += cleaner cleaner.start() } +activateMetrics(); Review Comment: Already added the test on the JIRA page: [KAFKA-16574](https://issues.apache.org/jira/browse/KAFKA-16574). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]
chiacyu commented on code in PR #15863: URL: https://github.com/apache/kafka/pull/15863#discussion_r1592509827 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -182,6 +183,27 @@ class LogCleaner(initialConfig: CleanerConfig, cleanerManager.removeMetrics() Review Comment: Yes, the metrics remained removed after reconfiguring, would it be a good idea to remove `cleanerManager.removeMetrics()` this line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16307) fix EventAccumulator thread idle ratio metric
[ https://issues.apache.org/jira/browse/KAFKA-16307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-16307. - Fix Version/s: 3.8.0 Reviewer: David Jacot Resolution: Fixed > fix EventAccumulator thread idle ratio metric > - > > Key: KAFKA-16307 > URL: https://issues.apache.org/jira/browse/KAFKA-16307 > Project: Kafka > Issue Type: Sub-task >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Major > Fix For: 3.8.0 > > > The metric does not seem to be accurate, nor reporting metrics at every > interval. Requires investigation -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]
dajac merged PR #15835: URL: https://github.com/apache/kafka/pull/15835 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-10199: Handle assignment with new remove operation in state updater [kafka]
cadonna opened a new pull request, #15882: URL: https://github.com/apache/kafka/pull/15882 Uses the new remove operation of the state updater that returns a future to handle task assignment. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16640: Replace TestUtils#resource by scala.util.Using [kafka]
frankvicky opened a new pull request, #15881: URL: https://github.com/apache/kafka/pull/15881 Check all uses of `TestUtils#resource` and replace with `scala.util.Using` - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16685: Add parent exception to RLMTask warning logs [kafka]
jeqo opened a new pull request, #15880: URL: https://github.com/apache/kafka/pull/15880 [KAFKA-16685] -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16685) RLMTask warning logs do not include parent exception trace
[ https://issues.apache.org/jira/browse/KAFKA-16685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Esteban Quilcate Otoya updated KAFKA-16685: - Summary: RLMTask warning logs do not include parent exception trace (was: RLMTask warn logs do not include parent exception trace) > RLMTask warning logs do not include parent exception trace > -- > > Key: KAFKA-16685 > URL: https://issues.apache.org/jira/browse/KAFKA-16685 > Project: Kafka > Issue Type: Improvement > Components: Tiered-Storage >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > > When RSMTask exceptions happen and are logged, it only includes the exception > message, but we lose the stack trace. > See > [https://github.com/apache/kafka/blob/70b8c5ae8e9336dbf4792e2d3cf100bbd70d6480/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L821-L831] > This makes it difficult to troubleshoot issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16685) RLMTask warn logs do not include parent exception trace
[ https://issues.apache.org/jira/browse/KAFKA-16685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Esteban Quilcate Otoya updated KAFKA-16685: - Summary: RLMTask warn logs do not include parent exception trace (was: RSM Task warn logs do not include parent exception trace) > RLMTask warn logs do not include parent exception trace > --- > > Key: KAFKA-16685 > URL: https://issues.apache.org/jira/browse/KAFKA-16685 > Project: Kafka > Issue Type: Improvement > Components: Tiered-Storage >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > > When RSMTask exceptions happen and are logged, it only includes the exception > message, but we lose the stack trace. > See > [https://github.com/apache/kafka/blob/70b8c5ae8e9336dbf4792e2d3cf100bbd70d6480/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L821-L831] > This makes it difficult to troubleshoot issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16685) RLMTask warning logs do not include parent exception trace
[ https://issues.apache.org/jira/browse/KAFKA-16685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Esteban Quilcate Otoya updated KAFKA-16685: - Description: When RLMTask warning exceptions happen and are logged, it only includes the exception message, but we lose the stack trace. See [https://github.com/apache/kafka/blob/70b8c5ae8e9336dbf4792e2d3cf100bbd70d6480/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L821-L831] This makes it difficult to troubleshoot issues. was: When RSMTask exceptions happen and are logged, it only includes the exception message, but we lose the stack trace. See [https://github.com/apache/kafka/blob/70b8c5ae8e9336dbf4792e2d3cf100bbd70d6480/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L821-L831] This makes it difficult to troubleshoot issues. > RLMTask warning logs do not include parent exception trace > -- > > Key: KAFKA-16685 > URL: https://issues.apache.org/jira/browse/KAFKA-16685 > Project: Kafka > Issue Type: Improvement > Components: Tiered-Storage >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > > When RLMTask warning exceptions happen and are logged, it only includes the > exception message, but we lose the stack trace. > See > [https://github.com/apache/kafka/blob/70b8c5ae8e9336dbf4792e2d3cf100bbd70d6480/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L821-L831] > This makes it difficult to troubleshoot issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16685) RSM Task warn logs do not include parent exception trace
Jorge Esteban Quilcate Otoya created KAFKA-16685: Summary: RSM Task warn logs do not include parent exception trace Key: KAFKA-16685 URL: https://issues.apache.org/jira/browse/KAFKA-16685 Project: Kafka Issue Type: Improvement Components: Tiered-Storage Reporter: Jorge Esteban Quilcate Otoya Assignee: Jorge Esteban Quilcate Otoya When RSMTask exceptions happen and are logged, it only includes the exception message, but we lose the stack trace. See [https://github.com/apache/kafka/blob/70b8c5ae8e9336dbf4792e2d3cf100bbd70d6480/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L821-L831] This makes it difficult to troubleshoot issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]
cadonna merged PR #15870: URL: https://github.com/apache/kafka/pull/15870 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]
AyoubOm commented on code in PR #15607: URL: https://github.com/apache/kafka/pull/15607#discussion_r1592389047 ## streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java: ## @@ -244,21 +248,21 @@ public void doJoinFromLeftThenDeleteLeftEntity() { ); } } -// Now delete one LHS entity such that one delete is propagated down to the output. -left.pipeInput("lhs1", (String) null, baseTimestamp + 6); +// Now delete one LHS entity such that one delete is propagated down to the output. +left.pipeInput("lhs1", null, baseTimestamp + 6); assertThat( -outputTopic.readKeyValuesToMap(), -is(mkMap( -mkEntry("lhs1", null) +outputTopic.readKeyValuesToList(), +is(Collections.singletonList( +KeyValue.pair("lhs1", null) )) ); if (rejoin) { assertThat( -rejoinOutputTopic.readKeyValuesToMap(), -is(mkMap( -mkEntry("lhs1", null) -)) +rejoinOutputTopic.readKeyValuesToList(), +hasItem( Review Comment: Not testing all items because the test is not deterministic, it may or may not contain `("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),null)")` depending on whether it's the left hand or the right hand side of the final join which propagates first. `rejoin` is computed this way in the test: ___LEFT || |RIGHT | | | | \ / / \ / / \ / / FKJoin/ \ / \ / Rejoin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]
AyoubOm commented on code in PR #15607: URL: https://github.com/apache/kafka/pull/15607#discussion_r1592386867 ## streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java: ## @@ -244,21 +248,21 @@ public void doJoinFromLeftThenDeleteLeftEntity() { ); } } -// Now delete one LHS entity such that one delete is propagated down to the output. -left.pipeInput("lhs1", (String) null, baseTimestamp + 6); +// Now delete one LHS entity such that one delete is propagated down to the output. +left.pipeInput("lhs1", null, baseTimestamp + 6); assertThat( -outputTopic.readKeyValuesToMap(), -is(mkMap( -mkEntry("lhs1", null) +outputTopic.readKeyValuesToList(), +is(Collections.singletonList( +KeyValue.pair("lhs1", null) )) ); if (rejoin) { assertThat( -rejoinOutputTopic.readKeyValuesToMap(), -is(mkMap( -mkEntry("lhs1", null) -)) +rejoinOutputTopic.readKeyValuesToList(), +hasItem( Review Comment: Not testing all items because the test is not deterministic, it may or may not contain `("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),null)")` depending on whether it's the left hand or the right hand side of the final join which propagates first. `rejoin` is computed this way in the test: ___LEFT || |RIGHT | | | | \ / / \ / / \ / / FKJoin/ \ / \ / Rejoin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]
AyoubOm commented on code in PR #15607: URL: https://github.com/apache/kafka/pull/15607#discussion_r1592386867 ## streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java: ## @@ -244,21 +248,21 @@ public void doJoinFromLeftThenDeleteLeftEntity() { ); } } -// Now delete one LHS entity such that one delete is propagated down to the output. -left.pipeInput("lhs1", (String) null, baseTimestamp + 6); +// Now delete one LHS entity such that one delete is propagated down to the output. +left.pipeInput("lhs1", null, baseTimestamp + 6); assertThat( -outputTopic.readKeyValuesToMap(), -is(mkMap( -mkEntry("lhs1", null) +outputTopic.readKeyValuesToList(), +is(Collections.singletonList( +KeyValue.pair("lhs1", null) )) ); if (rejoin) { assertThat( -rejoinOutputTopic.readKeyValuesToMap(), -is(mkMap( -mkEntry("lhs1", null) -)) +rejoinOutputTopic.readKeyValuesToList(), +hasItem( Review Comment: Not testing all items because the test is not deterministic, it may or may not contain `("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),null)")` depending on whether it's the left hand or the right hand side of the final join which propagates first. `rejoin` is computed this way in the test: ___LEFT || |RIGHT | | | | \ / / \ / / \ / / FKJoin/ \ / \ / Rejoin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16682) Rewrite JassTestUtils by Java
[ https://issues.apache.org/jira/browse/KAFKA-16682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16682: -- Assignee: TengYao Chi (was: Chia-Ping Tsai) > Rewrite JassTestUtils by Java > - > > Key: KAFKA-16682 > URL: https://issues.apache.org/jira/browse/KAFKA-16682 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > > as title > one more thing is that we should change the package name from kafka.utils to > kafka.security -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16682) Rewrite JassTestUtils by Java
[ https://issues.apache.org/jira/browse/KAFKA-16682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844261#comment-17844261 ] TengYao Chi commented on KAFKA-16682: - I am able to handle this issue. > Rewrite JassTestUtils by Java > - > > Key: KAFKA-16682 > URL: https://issues.apache.org/jira/browse/KAFKA-16682 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > as title > one more thing is that we should change the package name from kafka.utils to > kafka.security -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] a slight change. [kafka]
gongxuanzhang commented on PR #15812: URL: https://github.com/apache/kafka/pull/15812#issuecomment-2098224856 > Hi @gongxuanzhang and thank you for the contribution! > > This is actually an issue in a lot of places, I checked with this checkstyle.xml rule: > > ``` > > ``` > > and `./gradlew checkstyleMain checkstyleTest --continue`. > > While fixing this in one place is good, it would make sense to try and fix this everywhere if we decide to address it at all. I've created https://issues.apache.org/jira/browse/KAFKA-16643 for this, and you can take this on if you're interested. If so, please see the contributing guide: https://kafka.apache.org/contributing.html and join the mailing list and JIRA. > > module violations > `:streams:upgrade-system-tests-24:checkstyleTest` 1 > `:streams:upgrade-system-tests-22:checkstyleTest` 1 > `:streams:upgrade-system-tests-23:checkstyleTest` 1 > `:streams:upgrade-system-tests-26:checkstyleTest` 1 > `:streams:upgrade-system-tests-28:checkstyleTest` 1 > `:streams:upgrade-system-tests-30:checkstyleTest` 1 > `:streams:upgrade-system-tests-25:checkstyleTest` 1 > `:streams:upgrade-system-tests-31:checkstyleTest` 1 > `:streams:upgrade-system-tests-27:checkstyleTest` 1 > `:streams:upgrade-system-tests-32:checkstyleTest` 1 > `:streams:upgrade-system-tests-36:checkstyleTest` 1 > `:streams:upgrade-system-tests-35:checkstyleTest` 1 > `:streams:upgrade-system-tests-34:checkstyleTest` 1 > `:streams:upgrade-system-tests-33:checkstyleTest` 1 > `:streams:upgrade-system-tests-37:checkstyleTest` 1 > `:generator:checkstyleMain`2 > `:connect:mirror-client:checkstyleMain`1 > `:connect:api:checkstyleMain` 1 > `:connect:json:checkstyleMain` 1 > `:server-common:checkstyleMain`10 > `:raft:checkstyleMain` 11 > `:storage:checkstyleMain` 2 > `:trogdor:checkstyleMain` 17 > `:server:checkstyleMain` 122 > `:connect:mirror:checkstyleMain` 3 > `:connect:test-plugins:checkstyleMain` 2 > `:tools:checkstyleMain`9 > `:storage:storage-api:checkstyleMain` 20 > `:streams:examples:checkstyleMain` 5 > `:streams:test-utils:checkstyleMain` 2 > `:group-coordinator:checkstyleMain`46 > `:metadata:checkstyleMain` 72 > `:raft:checkstyleTest` 5 > `:connect:runtime:checkstyleMain` 2 > `:group-coordinator:checkstyleTest`10 > `:server-common:checkstyleTest`4 > `:trogdor:checkstyleTest` 3 > `:metadata:checkstyleTest` 73 > `:streams:test-utils:checkstyleTest` 12 > `:streams:checkstyleMain` 57 > `:clients:checkstyleTest` 87 > `:clients:checkstyleMain` 122 > `:core:checkstyleMain` 1 > `:shell:checkstyleMain`10 > `:core:checkstyleTest` 12 > `:shell:checkstyleTest`2 > `:jmh-benchmarks:checkstyleMain` 1 > `:connect:mirror:checkstyleTest` 4 > `:connect:runtime:checkstyleTest` 15 > `:streams:checkstyleTest` 201 Thank you for your answer! I will continue to follow up and contribute. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16678: Remove variable "unimplementedquorum" [kafka]
frankvicky opened a new pull request, #15879: URL: https://github.com/apache/kafka/pull/15879 Remove variable "unimplementedquorum" from EndToEndAuthorizationTest.scala and SaslEndToEndAuthorizationTest.scala - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]
nizhikov commented on code in PR #15873: URL: https://github.com/apache/kafka/pull/15873#discussion_r1592313103 ## core/src/test/java/kafka/admin/ConfigCommandUnitTest.java: ## @@ -410,6 +448,430 @@ public void testOptionEntityTypeNames() { doTestOptionEntityTypeNames(false); } +@Test +public void shouldFailIfUnrecognisedEntityTypeUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); +} + +@Test +public void shouldFailIfUnrecognisedEntityType() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); +} + +@Test +public void shouldFailIfBrokerEntityTypeIsNotAnIntegerUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "A", "--entity-type", "brokers", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); +} + +@Test +public void shouldFailIfBrokerEntityTypeIsNotAnInteger() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "A", "--entity-type", "brokers", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); +} + +@Test +public void shouldFailIfShortBrokerEntityTypeIsNotAnIntegerUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--broker", "A", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); +} + +@Test +public void shouldFailIfShortBrokerEntityTypeIsNotAnInteger() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--broker", "A", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); +} + +@Test +public void shouldFailIfMixedEntityTypeFlagsUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "A", "--entity-type", "users", "--client", "B", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfMixedEntityTypeFlags() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "A", "--entity-type", "users", "--client", "B", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfInvalidHost() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "A,B", "--entity-type", "ips", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfInvalidHostUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "A,B", "--entity-type", "ips", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfUnresolvableHost() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "RFC2606.invalid", "--entity-type", "ips",
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1592297330 ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java: ## @@ -55,7 +57,7 @@ public class TieredStorageTestUtils { // Log cleanup interval is configured to be 500 ms. We need to wait at least that amount of time before // segments eligible for deletion gets physically removed. -public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5; +public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10; Review Comment: Yes, I think the test is flaky because the CI environment is quite slow, and maybe the IO is slower than we thought. From the log I can see, at `08:59:27,187`, we copied `0002.log` to remote. And after 1 second of `0002.log` copied to remote (`08:59:28,300`), it timed out and all resources started to get closed. I've waited 10 seconds for the log deletion to get completed, but obviously it's not enough for CI env. I've increased to 20 seconds and see if it fixes the issue. I think we've done what we can do to make it faster (i.e. set the configs to speed up the tests) ``` [2024-05-04 08:59:27,187] INFO [RemoteLogManager=0 partition=DcnVRVRSQd675ZLtCIn21A:topicB-0] Copied 0002.log to remote storage with segment-id: RemoteLogSegmentId{topicIdPartition=DcnVRVRSQd675ZLtCIn21A:topicB-0, id=gcVp790dRlmFCr_0tN0NTg} (kafka.log.remote.RemoteLogManager$RLMTask:792) [2024-05-04 08:59:28,300] INFO Closing topic-based RLMM resources [2024-05-04 08:59:28,304] INFO Closing the instance (org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask:328) [2024-05-04 08:59:28,308] INFO Exited from consumer task thread (org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask:151) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Revoke tasks from state updater with new remove [kafka]
cadonna commented on code in PR #15871: URL: https://github.com/apache/kafka/pull/15871#discussion_r1592297149 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -623,6 +623,21 @@ private void addToTasksToClose(final Map> futures, Review Comment: Yes, we are going to re-use it. Stay tuned! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16684) FetchResponse#responseData could return incorrect data
[ https://issues.apache.org/jira/browse/KAFKA-16684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844251#comment-17844251 ] Chia-Ping Tsai commented on KAFKA-16684: After [https://github.com/apache/kafka/commit/2b8aff58b575c199ee8372e5689420c9d77357a5] , I don't think the "cache" is useful. Hence, we can just remove the cache to fix this potential bug > FetchResponse#responseData could return incorrect data > -- > > Key: KAFKA-16684 > URL: https://issues.apache.org/jira/browse/KAFKA-16684 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > [https://github.com/apache/kafka/commit/2b8aff58b575c199ee8372e5689420c9d77357a5] > make it accept input to return "partial" data. The content of output is > based on the input but we cache the output ... It will return same output > even though we pass different input. That is a potential bug. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16684) FetchResponse#responseData could return incorrect data
Chia-Ping Tsai created KAFKA-16684: -- Summary: FetchResponse#responseData could return incorrect data Key: KAFKA-16684 URL: https://issues.apache.org/jira/browse/KAFKA-16684 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai [https://github.com/apache/kafka/commit/2b8aff58b575c199ee8372e5689420c9d77357a5] make it accept input to return "partial" data. The content of output is based on the input but we cache the output ... It will return same output even though we pass different input. That is a potential bug. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]
nizhikov commented on code in PR #15873: URL: https://github.com/apache/kafka/pull/15873#discussion_r1592249836 ## core/src/test/java/kafka/admin/ConfigCommandUnitTest.java: ## @@ -418,4 +880,77 @@ public static String[] toArray(String... first) { public static String[] toArray(List... lists) { return Stream.of(lists).flatMap(List::stream).toArray(String[]::new); } + +@SafeVarargs +public static List concat(List... lists) { +return Stream.of(lists).flatMap(List::stream).collect(Collectors.toList()); +} + +@SafeVarargs +public static Map concat(Map...maps) { +Map res = new HashMap<>(); Review Comment: Thanks. ~~Used your version of method.~~ Actually, we can't use this version (I have similar at first), because, some values of concatenating maps is null ang `merge` function used in `Collectors.toMap` checks `value` is not null: ``` @Override public V merge(K key, V value, BiFunction remappingFunction) { if (value == null) throw new NullPointerException(); if (remappingFunction == null) throw new NullPointerException(); ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12385) Remove FetchResponse#responseData
[ https://issues.apache.org/jira/browse/KAFKA-12385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844248#comment-17844248 ] Chia-Ping Tsai commented on KAFKA-12385: It seems to me `FetchResponse#responseData` is a weird function. The content of output is based on the input but we cache the output ... It will return same output even though we pass different input. That is a potential bug. > Remove FetchResponse#responseData > - > > Key: KAFKA-12385 > URL: https://issues.apache.org/jira/browse/KAFKA-12385 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > reference to [https://github.com/apache/kafka/pull/9758#discussion_r584142074] > We can rewrite related code to avoid using stale data structure. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-10199: Revoke tasks from state updater with new remove [kafka]
lucasbru commented on code in PR #15871: URL: https://github.com/apache/kafka/pull/15871#discussion_r1592261128 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -623,6 +623,21 @@ private void addToTasksToClose(final Map> futures, Review Comment: Are we going to reuse this method? Otherwise, I find this a bit too abstract, and I'd just inline it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]
nizhikov commented on code in PR #15873: URL: https://github.com/apache/kafka/pull/15873#discussion_r1592249836 ## core/src/test/java/kafka/admin/ConfigCommandUnitTest.java: ## @@ -418,4 +880,77 @@ public static String[] toArray(String... first) { public static String[] toArray(List... lists) { return Stream.of(lists).flatMap(List::stream).toArray(String[]::new); } + +@SafeVarargs +public static List concat(List... lists) { +return Stream.of(lists).flatMap(List::stream).collect(Collectors.toList()); +} + +@SafeVarargs +public static Map concat(Map...maps) { +Map res = new HashMap<>(); Review Comment: Thanks. Used your version of method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]
nizhikov commented on code in PR #15873: URL: https://github.com/apache/kafka/pull/15873#discussion_r1592246946 ## core/src/test/java/kafka/admin/ConfigCommandUnitTest.java: ## @@ -410,6 +448,430 @@ public void testOptionEntityTypeNames() { doTestOptionEntityTypeNames(false); } +@Test +public void shouldFailIfUnrecognisedEntityTypeUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); +} + +@Test +public void shouldFailIfUnrecognisedEntityType() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); +} + +@Test +public void shouldFailIfBrokerEntityTypeIsNotAnIntegerUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "A", "--entity-type", "brokers", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); +} + +@Test +public void shouldFailIfBrokerEntityTypeIsNotAnInteger() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "A", "--entity-type", "brokers", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); +} + +@Test +public void shouldFailIfShortBrokerEntityTypeIsNotAnIntegerUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--broker", "A", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); +} + +@Test +public void shouldFailIfShortBrokerEntityTypeIsNotAnInteger() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--broker", "A", "--alter", "--add-config", "a=b,c=d"}); +assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); +} + +@Test +public void shouldFailIfMixedEntityTypeFlagsUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "A", "--entity-type", "users", "--client", "B", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfMixedEntityTypeFlags() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "A", "--entity-type", "users", "--client", "B", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfInvalidHost() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "A,B", "--entity-type", "ips", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfInvalidHostUsingZookeeper() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, +"--entity-name", "A,B", "--entity-type", "ips", "--describe"}); +assertThrows(IllegalArgumentException.class, createOpts::checkArgs); +} + +@Test +public void shouldFailIfUnresolvableHost() { +ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", +"--entity-name", "RFC2606.invalid", "--entity-type", "ips",
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
clolov commented on PR #15673: URL: https://github.com/apache/kafka/pull/15673#issuecomment-2097975706 Heya @junrao! Let me know your thoughts on my responses. I have futher updated the below + rebased. I am waiting on the tests to finish running and if they uncover something I will aim to remedy it today. MetadataVersionTest - I added 3.8-IV1 where I believe it is needed QuorumControllerTest - I did not add 3.8-IV1 because I believe the references to 3.8-IV0 are to test the ELR feature PartitionRegistrationTest - Updated BrokerMetadataPublisherTest - Updated ZkMigrationIntegrationTest - Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16679) Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, `FeatureCommandUnitTest` into
[ https://issues.apache.org/jira/browse/KAFKA-16679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16679: -- Assignee: Cheng-Kai, Zhang (was: Chia-Ping Tsai) > Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, > `FeatureCommandUnitTest` into > > > Key: KAFKA-16679 > URL: https://issues.apache.org/jira/browse/KAFKA-16679 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Cheng-Kai, Zhang >Priority: Minor > > Normally, we don't put multi test classes into single file. Those test > classes can be extracted into a new class file. Or we can merge them into > single class by using "@Test" annotation. That can make those test cases run > without embedded cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2097869168 @junrao @satishd @chia7712 @showuon Updated the test plan in the summary. Verified that the patch fixes the issue by running the trunk and patched build. With the fix, the high-watermark value gets updated to the valid offset. Please take a look when you get chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
clolov commented on code in PR #15673: URL: https://github.com/apache/kafka/pull/15673#discussion_r1592116050 ## clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java: ## @@ -70,7 +70,7 @@ else if (isolationLevel == IsolationLevel.READ_COMMITTED) minVersion = 2; else if (requireTimestamp) minVersion = 1; -return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel); +return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(false), CONSUMER_REPLICA_ID, isolationLevel); Review Comment: We cannot test the latest unstable version in the client, correct. This should be only temporary until I release the subsequent pull request where we introduce a correct OffsetSpec to allow clients to correctly call this behaviour and introduce tests for it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
clolov commented on code in PR #15673: URL: https://github.com/apache/kafka/pull/15673#discussion_r1592114443 ## core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala: ## @@ -219,7 +219,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { TestUtils.generateAndProduceMessages(servers, topic, 9) TestUtils.produceMessage(servers, topic, "test-10", System.currentTimeMillis() + 10L) -for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to ApiKeys.LIST_OFFSETS.latestVersion) { +for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to ApiKeys.LIST_OFFSETS.latestVersion(false)) { Review Comment: I want to hide the latest unstable version of the ListOffsetRequest from everywhere. A follow-up pull request will allow it to be called and introduce the changes needed for the client to call with this a newer OffsetSpec and a test confirming the behaviour. Does this make sense or am I misunderstanding the question? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16679) Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, `FeatureCommandUnitTest` into
[ https://issues.apache.org/jira/browse/KAFKA-16679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844222#comment-17844222 ] Cheng-Kai, Zhang commented on KAFKA-16679: -- Hi [~chia7712] , could I work on this one? :D > Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, > `FeatureCommandUnitTest` into > > > Key: KAFKA-16679 > URL: https://issues.apache.org/jira/browse/KAFKA-16679 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > Normally, we don't put multi test classes into single file. Those test > classes can be extracted into a new class file. Or we can merge them into > single class by using "@Test" annotation. That can make those test cases run > without embedded cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]
sidyag commented on code in PR #15837: URL: https://github.com/apache/kafka/pull/15837#discussion_r1592106849 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2822,6 +2822,31 @@ class KafkaApisTest extends Logging { () => kafkaApis.handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching)) } + @Test + def requiredAclsNotPresentWriteTxnMarkersThrowsAuthorizationException(): Unit = { Review Comment: That is the happy case path verified by existing tests. As mocks are not present there, by default the CLUSTER_ACTION check doesn't throw an exception, and the ALTER check returns false. I can modify the existing tests to make that explicit and duplicate it to test for the second scenario. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]
sidyag commented on code in PR #15837: URL: https://github.com/apache/kafka/pull/15837#discussion_r1592106849 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2822,6 +2822,31 @@ class KafkaApisTest extends Logging { () => kafkaApis.handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching)) } + @Test + def requiredAclsNotPresentWriteTxnMarkersThrowsAuthorizationException(): Unit = { Review Comment: That is the happy case path verified by existing tests. As mocks are not present there, by default the CLUSTER_ACTION check doesn't throw an exception, and the ALTER check returns false. I can modify the existing tests to make that explicit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16676: Add missing RPCs to security.html [kafka]
AndrewJSchofield opened a new pull request, #15878: URL: https://github.com/apache/kafka/pull/15878 KIP-714 and KIP-1000 introduced 3 new RPCs. These new RPCs were not added to docs/security.html to document the authorization checks required to perform the requests. While I was adding these RPCs, I noticed that a few other recent RPCs had also not been added so I added those to. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16683) Extract security-related helpers from scala.TestUtils to java class
Chia-Ping Tsai created KAFKA-16683: -- Summary: Extract security-related helpers from scala.TestUtils to java class Key: KAFKA-16683 URL: https://issues.apache.org/jira/browse/KAFKA-16683 Project: Kafka Issue Type: Sub-task Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai We can merge them into `JaasTestUtils and then rename `JaasTestUtils` to `SecurityTestUtils. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16680) Make ClusterTestExtensions support SASL
[ https://issues.apache.org/jira/browse/KAFKA-16680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16680: --- Description: This is a umbrella issue. In order to migrate more tests to new test infra, we ought to make it support SASL at least. *phase1: reuse/rewrite existent SASL utils by Java* # MiniKdc # JaasTestUtils # Move security-related helpers from scala.TestUtils # extract/rewrite non-zk code from SaslSetup to new java class *phase2: make `ClusterTest#securityProtocol` works. It does not work for kraft mode :(* # add client-related helper to generate consumer/producer/admin class with security configs # configure kraft server with security settings # migrate tests of tools to use new test infra with security was: This is a umbrella issue. In order to migrate more tests to new test infra, we ought to make it support SASL at least. *phase1: reuse/rewrite existent SASL utils by Java* # MiniKdc # JaasTestUtils # Move security-related helpers from scala.TestUtils to java.TestUtils # extract/rewrite non-zk code from SaslSetup to new java class *phase2: make `ClusterTest#securityProtocol` works. It does not work for kraft mode :(* # add client-related helper to generate consumer/producer/admin class with security configs # configure kraft server with security settings # migrate tests of tools to use new test infra with security > Make ClusterTestExtensions support SASL > --- > > Key: KAFKA-16680 > URL: https://issues.apache.org/jira/browse/KAFKA-16680 > Project: Kafka > Issue Type: New Feature >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > This is a umbrella issue. > In order to migrate more tests to new test infra, we ought to make it support > SASL at least. > *phase1: reuse/rewrite existent SASL utils by Java* > # MiniKdc > # JaasTestUtils > # Move security-related helpers from scala.TestUtils > # extract/rewrite non-zk code from SaslSetup to new java class > *phase2: make `ClusterTest#securityProtocol` works. It does not work for > kraft mode :(* > # add client-related helper to generate consumer/producer/admin class with > security configs > # configure kraft server with security settings > # migrate tests of tools to use new test infra with security > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16682) Rewrite JassTestUtils by Java
Chia-Ping Tsai created KAFKA-16682: -- Summary: Rewrite JassTestUtils by Java Key: KAFKA-16682 URL: https://issues.apache.org/jira/browse/KAFKA-16682 Project: Kafka Issue Type: Sub-task Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai as title one more thing is that we should change the package name from kafka.utils to kafka.security -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16681) Rewrite MiniKDC by Java
Chia-Ping Tsai created KAFKA-16681: -- Summary: Rewrite MiniKDC by Java Key: KAFKA-16681 URL: https://issues.apache.org/jira/browse/KAFKA-16681 Project: Kafka Issue Type: Sub-task Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai Noted: # we need to move it from scala folder to java folder # don't change the package name since system tests requires it -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16665: Allow to initialize newly assigned partition's positions without allowing fetching while callback runs [kafka]
lucasbru merged PR #15856: URL: https://github.com/apache/kafka/pull/15856 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close
[ https://issues.apache.org/jira/browse/KAFKA-16589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16589: --- Parent: KAFKA-16680 Issue Type: Sub-task (was: Improvement) > Consider removing `ClusterInstance#createAdminClient` since callers are not > sure whether they need to call close > > > Key: KAFKA-16589 > URL: https://issues.apache.org/jira/browse/KAFKA-16589 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > > Sometimes we close the admin created by `createAdminClient`, and sometimes we > don't. That is not a true problem since the `ClusterInstance` will call > `close` when stopping. > However, that cause a lot of inconsistent code, and in fact it does not save > much time since creating a Admin is not a hard work. We can get > `bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily. > > {code:java} > // before > try (Admin admin = cluster.createAdminClient()) { } > // after v0 > try (Admin admin = Admin.create(Collections.singletonMap( > CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, > cluster.bootstrapServers( {} > {code} > Personally, the `after` version is not verbose, but we can have alternatives: > `Map clientConfigs`. > > {code:java} > // after v1 > try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16680) Make ClusterTestExtensions support SASL
Chia-Ping Tsai created KAFKA-16680: -- Summary: Make ClusterTestExtensions support SASL Key: KAFKA-16680 URL: https://issues.apache.org/jira/browse/KAFKA-16680 Project: Kafka Issue Type: New Feature Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai This is a umbrella issue. In order to migrate more tests to new test infra, we ought to make it support SASL at least. *phase1: reuse/rewrite existent SASL utils by Java* # MiniKdc # JaasTestUtils # Move security-related helpers from scala.TestUtils to java.TestUtils # extract/rewrite non-zk code from SaslSetup to new java class *phase2: make `ClusterTest#securityProtocol` works. It does not work for kraft mode :(* # add client-related helper to generate consumer/producer/admin class with security configs # configure kraft server with security settings # migrate tests of tools to use new test infra with security -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Remove dev_version parameter from streams tests [kafka]
lucasbru commented on PR #15874: URL: https://github.com/apache/kafka/pull/15874#issuecomment-2097704867 @mjsax It does change, because we need to keep a list of tests (with parameters) that do not work with ARM. The `from_version` parameter does not change, because only old versions are affected. But the `to_version` parameter changed with every minor release. After removing the `to_version` parameter, we won't have to change the list again. Does that clarify the intent here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]
lucasbru commented on code in PR #15870: URL: https://github.com/apache/kafka/pull/15870#discussion_r1591960681 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId taskId) { tasks.addPendingTaskToCloseClean(taskId); } +private void addToTasksToClose(final Map> futures, + final Set tasksToCloseCleanFromStateUpdater, + final Set tasksToCloseDirtyFromStateUpdater) { +iterateAndActOnFuture(futures, removedTaskResult -> { +final Task task = removedTaskResult.task(); +final Optional exception = removedTaskResult.exception(); +if (exception.isPresent()) { +tasksToCloseDirtyFromStateUpdater.add(task); +} else { +tasksToCloseCleanFromStateUpdater.add(task); +} +}); +} + +private void iterateAndActOnFuture(final Map> futures, + final java.util.function.Consumer action) { +for (final Map.Entry> entry : futures.entrySet()) { +final TaskId taskId = entry.getKey(); +final CompletableFuture future = entry.getValue(); +try { +final StateUpdater.RemovedTaskResult removedTaskResult = waitForFuture(taskId, future); +action.accept(removedTaskResult); +} catch (final ExecutionException executionException) { +log.warn("An exception happened when removing task {} from the state updater. The exception will be handled later: ", +taskId, executionException); +} catch (final InterruptedException ignored) { } Review Comment: Sounds good! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16677) Replace ClusterType#ALL and ClusterType#DEFAULT by Array
[ https://issues.apache.org/jira/browse/KAFKA-16677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16677: -- Assignee: PoAn Yang (was: Chia-Ping Tsai) > Replace ClusterType#ALL and ClusterType#DEFAULT by Array > > > Key: KAFKA-16677 > URL: https://issues.apache.org/jira/browse/KAFKA-16677 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > > Both ClusterType#ALL and ClusterType#DEFAULT are a kind of "tag" instead of > true "type". It seems to me they can be removed by using Array. For example: > ClusterType#ALL -> {Type.ZK, Type.KRAFT, Type.CO_KRAFT} > ClusterType#DEFAULT -> {} > There are two benefits > 1. That is more readable for "ALL type". > 2. We don't throw the awkward "exception" when seeing "DEFAULT". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16679) Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, `FeatureCommandUnitTest` into
Chia-Ping Tsai created KAFKA-16679: -- Summary: Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, `FeatureCommandUnitTest` into Key: KAFKA-16679 URL: https://issues.apache.org/jira/browse/KAFKA-16679 Project: Kafka Issue Type: Test Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai Normally, we don't put multi test classes into single file. Those test classes can be extracted into a new class file. Or we can merge them into single class by using "@Test" annotation. That can make those test cases run without embedded cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16677) Replace ClusterType#ALL and ClusterType#DEFAULT by Array
[ https://issues.apache.org/jira/browse/KAFKA-16677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844163#comment-17844163 ] PoAn Yang commented on KAFKA-16677: --- Hi [~chia7712], I'm interested in this feature. If you're not working on it, may I assign to myself? Thank you. > Replace ClusterType#ALL and ClusterType#DEFAULT by Array > > > Key: KAFKA-16677 > URL: https://issues.apache.org/jira/browse/KAFKA-16677 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > Both ClusterType#ALL and ClusterType#DEFAULT are a kind of "tag" instead of > true "type". It seems to me they can be removed by using Array. For example: > ClusterType#ALL -> {Type.ZK, Type.KRAFT, Type.CO_KRAFT} > ClusterType#DEFAULT -> {} > There are two benefits > 1. That is more readable for "ALL type". > 2. We don't throw the awkward "exception" when seeing "DEFAULT". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16615) JoinGroup API for upgrading ConsumerGroup
[ https://issues.apache.org/jira/browse/KAFKA-16615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-16615. - Fix Version/s: 3.8.0 Reviewer: David Jacot Assignee: Dongnuo Lyu Resolution: Fixed > JoinGroup API for upgrading ConsumerGroup > - > > Key: KAFKA-16615 > URL: https://issues.apache.org/jira/browse/KAFKA-16615 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongnuo Lyu >Assignee: Dongnuo Lyu >Priority: Major > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dajac merged PR #15798: URL: https://github.com/apache/kafka/pull/15798 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16678) Remove unimplementedquorum from EndToEndAuthorizationTest
[ https://issues.apache.org/jira/browse/KAFKA-16678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16678: -- Assignee: TengYao Chi (was: Chia-Ping Tsai) > Remove unimplementedquorum from EndToEndAuthorizationTest > - > > Key: KAFKA-16678 > URL: https://issues.apache.org/jira/browse/KAFKA-16678 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > > `unimplementedquorum`[0] is used to skip test cases if they don't support to > run by kraft. However, KAFKA-15219 , KAFKA-14765 and KAFKA-14776 make related > tests support to run by kraft. > In short, it is time to remove the unused variable :) > [0] > [https://github.com/apache/kafka/blob/d76352e2151178521dc447e3406dabb8fcd4c57c/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L146] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16678) Remove unimplementedquorum from EndToEndAuthorizationTest
[ https://issues.apache.org/jira/browse/KAFKA-16678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844157#comment-17844157 ] TengYao Chi commented on KAFKA-16678: - I am able to handle this issue. > Remove unimplementedquorum from EndToEndAuthorizationTest > - > > Key: KAFKA-16678 > URL: https://issues.apache.org/jira/browse/KAFKA-16678 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > `unimplementedquorum`[0] is used to skip test cases if they don't support to > run by kraft. However, KAFKA-15219 , KAFKA-14765 and KAFKA-14776 make related > tests support to run by kraft. > In short, it is time to remove the unused variable :) > [0] > [https://github.com/apache/kafka/blob/d76352e2151178521dc447e3406dabb8fcd4c57c/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L146] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16678) Remove unimplementedquorum from EndToEndAuthorizationTest
Chia-Ping Tsai created KAFKA-16678: -- Summary: Remove unimplementedquorum from EndToEndAuthorizationTest Key: KAFKA-16678 URL: https://issues.apache.org/jira/browse/KAFKA-16678 Project: Kafka Issue Type: Test Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai `unimplementedquorum`[0] is used to skip test cases if they don't support to run by kraft. However, KAFKA-15219 , KAFKA-14765 and KAFKA-14776 make related tests support to run by kraft. In short, it is time to remove the unused variable :) [0] [https://github.com/apache/kafka/blob/d76352e2151178521dc447e3406dabb8fcd4c57c/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L146] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Kafka-16668: improve cluster test [kafka]
chia7712 commented on code in PR #15861: URL: https://github.com/apache/kafka/pull/15861#discussion_r1591866899 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -306,11 +300,16 @@ public Builder setPerBrokerProperties(Map> perBroke return this; } +public Builder setTags(String[] tags) { Review Comment: We use `Array` in annotation, but it would be better to use `List` here. ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -153,15 +148,19 @@ public Map> perBrokerOverrideProperties() { return perBrokerOverrideProperties; } -public Map nameTags() { -Map tags = new LinkedHashMap<>(4); -name().ifPresent(name -> tags.put("Name", name)); -tags.put("MetadataVersion", metadataVersion.toString()); -tags.put("Security", securityProtocol.name()); -listenerName().ifPresent(listener -> tags.put("Listener", listener)); +public String[] tags() { return tags; } +public Map nameTags() { Review Comment: Could you please add the `tags` to this method? Also, it would be great to rename it to "displayName", and the return type should be `String` instead of `Map` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16668: improve cluster test [kafka]
chia7712 commented on PR #15861: URL: https://github.com/apache/kafka/pull/15861#issuecomment-2097528853 As #15766 gets merged now, could you please fix the build error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16677) Replace ClusterType#ALL and ClusterType#DEFAULT by Array
[ https://issues.apache.org/jira/browse/KAFKA-16677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844150#comment-17844150 ] PoAn Yang commented on KAFKA-16677: --- Hi [~chia7712], I think the feature is good. Most of time, we use same config to run KRAFT and CO_KRAFT. If we can use `setTypes`, we don't need to create another builder. > Replace ClusterType#ALL and ClusterType#DEFAULT by Array > > > Key: KAFKA-16677 > URL: https://issues.apache.org/jira/browse/KAFKA-16677 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > Both ClusterType#ALL and ClusterType#DEFAULT are a kind of "tag" instead of > true "type". It seems to me they can be removed by using Array. For example: > ClusterType#ALL -> {Type.ZK, Type.KRAFT, Type.CO_KRAFT} > ClusterType#DEFAULT -> {} > There are two benefits > 1. That is more readable for "ALL type". > 2. We don't throw the awkward "exception" when seeing "DEFAULT". -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: log newly created processId [kafka]
chia7712 merged PR #15851: URL: https://github.com/apache/kafka/pull/15851 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16677) Replace ClusterType#ALL and ClusterType#DEFAULT by Array
[ https://issues.apache.org/jira/browse/KAFKA-16677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844148#comment-17844148 ] Chia-Ping Tsai commented on KAFKA-16677: For another, we can simplify the code (https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java#L77) {code:java} // Some comments here ClusterConfig raftWithNewGroupCoordinator = ClusterConfig.defaultBuilder() .setTypes(Arrays.asList(KRAFT, CO_KRAFT)) .setName("newGroupCoordinator") .setServerProperties(serverProperties) .build(); {code} > Replace ClusterType#ALL and ClusterType#DEFAULT by Array > > > Key: KAFKA-16677 > URL: https://issues.apache.org/jira/browse/KAFKA-16677 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > Both ClusterType#ALL and ClusterType#DEFAULT are a kind of "tag" instead of > true "type". It seems to me they can be removed by using Array. For example: > ClusterType#ALL -> {Type.ZK, Type.KRAFT, Type.CO_KRAFT} > ClusterType#DEFAULT -> {} > There are two benefits > 1. That is more readable for "ALL type". > 2. We don't throw the awkward "exception" when seeing "DEFAULT". -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Correct connector scheduled rebalance logs [kafka]
chia7712 commented on code in PR #15875: URL: https://github.com/apache/kafka/pull/15875#discussion_r1591859505 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1768,7 +1768,7 @@ private boolean handleRebalanceCompleted() { long now = time.milliseconds(); if (scheduledRebalance <= now) { log.debug("Requesting rebalance because scheduled rebalance timeout has been reached " -+ "(now: {} scheduledRebalance: {}", scheduledRebalance, now); ++ "(now: {} scheduledRebalance: {}", now, scheduledRebalance); Review Comment: Could you please remove `(` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16593) Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions
[ https://issues.apache.org/jira/browse/KAFKA-16593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16593. Fix Version/s: 3.8.0 Resolution: Fixed > Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions > -- > > Key: KAFKA-16593 > URL: https://issues.apache.org/jira/browse/KAFKA-16593 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > Fix For: 3.8.0 > > > as title. the test is in tools module and it does not need to depend on test > code of core module directly -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 merged PR #15766: URL: https://github.com/apache/kafka/pull/15766 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]
chia7712 commented on code in PR #15840: URL: https://github.com/apache/kafka/pull/15840#discussion_r1591853150 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -171,120 +144,352 @@ public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception { KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient(); String brokerId = "1"; -adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); -alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); +AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); +alterOpts = asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); // Add config -alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "11"), Optional.of(brokerId)); -alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "12"), Optional.empty()); +alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), +singletonMap("message.max.size", "11")); +alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(), +singletonMap("message.max.size", "12")); // Change config -alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "13"), Optional.of(brokerId)); -alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "14"), Optional.empty()); +alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), +singletonMap("message.max.size", "13")); +alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(), +singletonMap("message.max.size", "14")); // Delete config -deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.of(brokerId)); -deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.empty()); +deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), +singleton("message.max.size")); +deleteAndVerifyConfig(zkClient, adminZkClient, Optional.empty(), +singleton("message.max.size")); // Listener configs: should work only with listener name -alterAndVerifyConfig(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId)); +alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), +singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")); assertThrows(ConfigException.class, -() -> alterConfigWithZk(zkClient, Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId))); +() -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), +singletonMap("ssl.keystore.location", "/tmp/test.jks"))); // Per-broker config configured at default cluster-level should fail assertThrows(ConfigException.class, -() -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.empty())); -deleteAndVerifyConfig(zkClient, Collections.singleton("listener.name.external.ssl.keystore.location"), Optional.of(brokerId)); +() -> alterConfigWithZk(zkClient, adminZkClient, Optional.empty(), + singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"))); +deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), +singleton("listener.name.external.ssl.keystore.location")); // Password config update without encoder secret should fail assertThrows(IllegalArgumentException.class, -() -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.password", "secret"), Optional.of(brokerId))); +() -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("listener.name.external.ssl.keystore.password", "secret"))); // Password config update with encoder secret should succeed and encoded password must be stored in ZK Map configs = new HashMap<>(); configs.put("listener.name.external.ssl.keystore.password", "secret"); configs.put("log.cleaner.threads", "2"); -Map encoderConfigs = Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); -alterConfigWithZk(zkClient, configs, Optional.of(brokerId), encoderConfigs); +Map encoderConfigs = new HashMap<>(configs); +