[jira] [Updated] (KAFKA-16250) Consumer group coordinator should perform sanity check on the offset commits.

2024-05-07 Thread Calvin Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Calvin Liu updated KAFKA-16250:
---
Description: 
The current coordinator does not validate the offset commits before persisting 
it in the record.

In a real case, though, I am not sure why the consumer generates the offset 
commits with a consumer offset valued at -2, the "illegal" consumer offset 
value caused confusion with the admin cli when describing the consumer group. 
The consumer offset field is marked "".

 

 

  was:
The current coordinator does not validate the offset commits before persisting 
it in the record.

In a real case, though, I am not sure why the consumer generates the offset 
commits with a consumer offset valued at -2, the "illegal" consumer offset 
value caused confusion with the admin cli when describing the consumer group. 
The consumer offset field is marked "-".

 

 


> Consumer group coordinator should perform sanity check on the offset commits.
> -
>
> Key: KAFKA-16250
> URL: https://issues.apache.org/jira/browse/KAFKA-16250
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Calvin Liu
>Priority: Major
>
> The current coordinator does not validate the offset commits before 
> persisting it in the record.
> In a real case, though, I am not sure why the consumer generates the offset 
> commits with a consumer offset valued at -2, the "illegal" consumer offset 
> value caused confusion with the admin cli when describing the consumer group. 
> The consumer offset field is marked "".
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Kafka-16668: Add tags support in ClusterTestExtension [kafka]

2024-05-07 Thread via GitHub


johnnychhsu commented on code in PR #15861:
URL: https://github.com/apache/kafka/pull/15861#discussion_r1592759299


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -153,15 +148,19 @@ public Map> 
perBrokerOverrideProperties() {
 return perBrokerOverrideProperties;
 }
 
-public Map nameTags() {
-Map tags = new LinkedHashMap<>(4);
-name().ifPresent(name -> tags.put("Name", name));
-tags.put("MetadataVersion", metadataVersion.toString());
-tags.put("Security", securityProtocol.name());
-listenerName().ifPresent(listener -> tags.put("Listener", listener));
+public String[] tags() {
 return tags;
 }
 
+public Map nameTags() {

Review Comment:
   since we can get the tags from the getter, why do we still need to add this 
into the displayTags?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16656) Using a custom replication.policy.separator with DefaultReplicationPolicy

2024-05-07 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844368#comment-17844368
 ] 

Chris Egerton commented on KAFKA-16656:
---

Hi [~leninjoseph] it should be possible to use a custom separator with the 
{{DefaultReplicationPolicy}} class. What are the names of the topics you're 
seeing cyclical replication for?

 

I've sketched out a unit test that can be added to the [ReplicationPolicyTest 
suite|https://github.com/apache/kafka/blob/05df10449eb9c95fe6d6055b302c84686be8058d/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java]
 that probes how the class handles custom separators and everything appears to 
be working, but this doesn't rule out the possibility of bugs in other places:

 

{{@Test}}
{{public void testCustomSeparator() {}}
{{  DefaultReplicationPolicy policyWithCustomSeparator = new 
DefaultReplicationPolicy();}}
{{  Map config = new HashMap<>();}}
{{  config.put(DefaultReplicationPolicy.SEPARATOR_CONFIG, "-");}}
{{  policyWithCustomSeparator.configure(config);}}

{{  assertEquals("source", policyWithCustomSeparator.topicSource("source-t"));}}
{{  assertEquals("t", policyWithCustomSeparator.upstreamTopic("source-t"));}}
{{}}}

> Using a custom replication.policy.separator with DefaultReplicationPolicy
> -
>
> Key: KAFKA-16656
> URL: https://issues.apache.org/jira/browse/KAFKA-16656
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.5.1
>Reporter: Lenin Joseph
>Priority: Major
>
> Hi,
> In the case of bidirectional replication using mm2, when we tried using a 
> custom replication.policy.separator( ex: "-") with DefaultReplicationPolicy , 
> we see cyclic replication of topics. Could you confirm whether it's mandatory 
> to use a CustomReplicationPolicy whenever we want to use a separator other 
> than a "." ?
> Regards, 
> Lenin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-05-07 Thread via GitHub


viktorsomogyi commented on PR #15697:
URL: https://github.com/apache/kafka/pull/15697#issuecomment-2098821932

   @soarez at the end I chose the shortcut regarding detecting leaders before 
shutdown. The reason is complex as the solution that would be required for this 
is complex too.
   So on one part the sequence of events is problematic. First we update the 
`LogManager` and then try to propagate the event to the controller. At this 
point the metadata is stale so I can't use that for reliable information to 
detect whether partitions have leadership or not. A workaround would be to 
subtract the LogManager's data from metadata cache (ie. if there is only a 
single isr replica and that is the current, then we can accept it as offline in 
reality). I don't really feel that it is a robust solution, it could be prone 
to race conditions on the network depending on how requests come from the 
controller as long as it's alive. I think it's more robust to just fail if we 
can't contact the controller.
   The second reason is a bit technical and can be worked around, although 
requires lots of effort. When trying to extract which replica->logdir 
information from `LogManager`, my only available information regarding logdirs 
given by the event is the `Uuid`. Unfortunately `LogManager` doesn't store the 
`Uuid` of an offline dir (and besides I don't think `Uuid` and logdir names 
used consistently across the whole module). This problem can be solved by 
propagating both logdir and `Uuid` in the events or store offline dirs' `Uuid ` 
in `LogManager`. I think the latter is problematic because we can't know the 
point until we should store information about offline dirs as they might never 
come back. The first can be done, although could be a sizeable refactor and 
generally I felt that just choosing the simpler route now could be more robust.
   Let me know if you think we should try it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16108) Backport fix for KAFKA-16093 to 3.7

2024-05-07 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844365#comment-17844365
 ] 

Igor Soarez commented on KAFKA-16108:
-

[~ChrisEgerton] is this still relevant? The priority field is set to blocker 
but the description says it's not a blocker, what's the correct priority?

> Backport fix for KAFKA-16093 to 3.7
> ---
>
> Key: KAFKA-16108
> URL: https://issues.apache.org/jira/browse/KAFKA-16108
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.7.1
>
>
> A fix for KAFKA-16093 is present on the branches trunk (the version for which 
> is currently 3.8.0-SNAPSHOT) and 3.6. We are in code freeze for the 3.7.0 
> release, and this issue is not a blocker, so it cannot be backported right 
> now.
> We should backport the fix once 3.7.0 has been released and before 3.7.1 is 
> released.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-07 Thread via GitHub


raminqaf commented on PR #15601:
URL: https://github.com/apache/kafka/pull/15601#issuecomment-2098817942

   @gharris1727 Thanks for the feedback! I reverted all the changes you 
requested and reverted a couple of other indentation problems that caused a 
diff. I can even go further and revert & inline the introduce private methods 
(i.e., `emitInnerJoin` or `putInOuterJoinStore`) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16686) Flakey tests in TopicBasedRemoteLogMetadataManagerTest

2024-05-07 Thread Gaurav Narula (Jira)
Gaurav Narula created KAFKA-16686:
-

 Summary: Flakey tests in TopicBasedRemoteLogMetadataManagerTest
 Key: KAFKA-16686
 URL: https://issues.apache.org/jira/browse/KAFKA-16686
 Project: Kafka
  Issue Type: Test
  Components: Tiered-Storage
Reporter: Gaurav Narula
Assignee: Gaurav Narula


Tests in {{TopicBasedRemoteLogMetadataManagerTest}} flake because 
{{waitUntilConsumerCatchesUp}} may return before all expected metadata is 
caught up.

Flakyness report 
[here|https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16287: Implement example tests for common rebalance callback [kafka]

2024-05-07 Thread via GitHub


cadonna commented on code in PR #15408:
URL: https://github.com/apache/kafka/pull/15408#discussion_r1592700226


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala:
##
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package integration.kafka.api
+
+import kafka.api.{AbstractConsumerTest, BaseConsumerTest}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerRebalanceListener}
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import java.util
+import java.util.Arrays.asList
+import java.util.Collections
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.stream.Stream
+
+/**
+ * Integration tests for the consumer that cover interaction with the consumer 
from within callbacks
+ * and listeners.
+ */
+class PlaintextConsumerCallbackTest extends AbstractConsumerTest {
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumerRebalanceListenerAssignOnPartitionsAssigned(quorum: String, 
groupProtocol: String): Unit = {
+val tp = new TopicPartition(topic, 0);
+triggerOnPartitionsAssigned { (consumer, _) =>
+  val e: Exception = assertThrows(classOf[IllegalStateException], () => 
consumer.assign(Collections.singletonList(tp)))
+  assertEquals(e.getMessage, "Subscription to topics, partitions and 
pattern are mutually exclusive")

Review Comment:
   Are you proposing to change the message of the exception that is verified 
here?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: enable test for ensureInternalEndpointIsSecured [kafka]

2024-05-07 Thread via GitHub


FrankYang0529 commented on code in PR #15868:
URL: https://github.com/apache/kafka/pull/15868#discussion_r1592700292


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java:
##
@@ -115,11 +112,11 @@ public void ensureInternalEndpointIsSecured() throws 
Throwable {
 // Try again, but with an invalid signature
 log.info(
 "Making a POST request to the {} endpoint with no connector 
started and an invalid signature header; "
-+ "expecting 403 error response",
++ "expecting 503 error response",
 connectorTasksEndpoint
 );
 assertEquals(
-FORBIDDEN.getStatusCode(),
+SERVICE_UNAVAILABLE.getStatusCode(),

Review Comment:
   This was added in https://github.com/apache/kafka/pull/11783.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: enable test for ensureInternalEndpointIsSecured [kafka]

2024-05-07 Thread via GitHub


FrankYang0529 commented on code in PR #15868:
URL: https://github.com/apache/kafka/pull/15868#discussion_r1592697934


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java:
##
@@ -115,11 +112,11 @@ public void ensureInternalEndpointIsSecured() throws 
Throwable {
 // Try again, but with an invalid signature
 log.info(
 "Making a POST request to the {} endpoint with no connector 
started and an invalid signature header; "
-+ "expecting 403 error response",
++ "expecting 503 error response",
 connectorTasksEndpoint
 );
 assertEquals(
-FORBIDDEN.getStatusCode(),
+SERVICE_UNAVAILABLE.getStatusCode(),

Review Comment:
   Here, if the connector is not started, there is no `sessionKey`.
   
   
https://github.com/apache/kafka/blob/05df10449eb9c95fe6d6055b302c84686be8058d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2748-L2752



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16681) Rewrite MiniKDC by Java

2024-05-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16681:
--

Assignee: PoAn Yang  (was: Chia-Ping Tsai)

> Rewrite MiniKDC by Java
> ---
>
> Key: KAFKA-16681
> URL: https://issues.apache.org/jira/browse/KAFKA-16681
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Major
>
> Noted:
>  # we need to move it from scala folder to java folder
>  # don't change the package name since system tests requires it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13329) Connect does not perform preflight validation for per-connector key and value converters

2024-05-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13329.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Connect does not perform preflight validation for per-connector key and value 
> converters
> 
>
> Key: KAFKA-13329
> URL: https://issues.apache.org/jira/browse/KAFKA-13329
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> Users may specify a key and/or value converter class for their connector 
> directly in the configuration for that connector. If this occurs, no 
> preflight validation is performed to ensure that the specified converter is 
> valid.
> Unfortunately, the [Converter 
> interface|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java]
>  does not require converters to expose a {{ConfigDef}} (unlike the 
> [HeaderConverter 
> interface|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L48-L52],
>  which does have that requirement), so it's unlikely that the configuration 
> properties of the converter itself can be validated.
> However, we can and should still validate that the converter class exists, 
> can be instantiated (i.e., has a public, no-args constructor and is a 
> concrete, non-abstract class), and implements the {{Converter}} interface.
> *EDIT:* Since this ticket was originally filed, a {{Converter::config}} 
> method was added in 
> [KIP-769|https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+APIs+to+list+all+connector+plugins+and+retrieve+their+configuration+definitions].
>  We can now utilize that config definition during preflight validation for 
> connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13328) Connect does not perform preflight validation for per-connector header converters

2024-05-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13328.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Connect does not perform preflight validation for per-connector header 
> converters
> -
>
> Key: KAFKA-13328
> URL: https://issues.apache.org/jira/browse/KAFKA-13328
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> Users may specify a header converter class for their connector directly in 
> the configuration for that connector. If this occurs, no preflight validation 
> is performed to ensure that the specified converter is valid.
> {{HeaderConverter}} implementations are required to provide a valid 
> {{ConfigDef}} to the Connect framework via 
> [HeaderConverter::config|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L48-L52],
>  but this object isn't actually leveraged anywhere by Connect.
> Connect should make use of this config object during preflight validation for 
> connectors to fail faster when their header converters are misconfigured.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16681) Rewrite MiniKDC by Java

2024-05-07 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844345#comment-17844345
 ] 

PoAn Yang commented on KAFKA-16681:
---

Hi [~chia7712], I'm interested in this. If you are not working on it, may I 
assign to myself? Thank you.

> Rewrite MiniKDC by Java
> ---
>
> Key: KAFKA-16681
> URL: https://issues.apache.org/jira/browse/KAFKA-16681
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> Noted:
>  # we need to move it from scala folder to java folder
>  # don't change the package name since system tests requires it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-07 Thread via GitHub


C0urante merged PR #14309:
URL: https://github.com/apache/kafka/pull/14309


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]

2024-05-07 Thread via GitHub


jolshan commented on PR #15837:
URL: https://github.com/apache/kafka/pull/15837#issuecomment-2098727340

   Sorry I was out of town (at KSB). I will try to take a look today, but 
thanks Luke for approving as well :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-07 Thread via GitHub


C0urante commented on PR #14309:
URL: https://github.com/apache/kafka/pull/14309#issuecomment-2098726331

   Thanks @gharris1727. I've reverted the changes to `Utils.java` and verified 
locally with tests. Everything else looked okay on the previous CI run, going 
to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-07 Thread via GitHub


gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1591293880


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -184,20 +146,34 @@ public void process(final Record record) {
 // This condition below allows us to process the 
out-of-order records without the need
 // to hold it in the temporary outer store
 if (!outerJoinStore.isPresent() || timeTo < 
sharedTimeTracker.streamTime) {
-
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
null)));
+context().forward(
+record.withValue(joiner.apply(record.key(), 
record.value(), null)));

Review Comment:
   nit: revert



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,95 +42,73 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {
 private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
-private final String otherWindowName;
+private final boolean outer;
+private final ValueJoinerWithKey joiner;
 private final long joinBeforeMs;
 private final long joinAfterMs;
 private final long joinGraceMs;
+private final String otherWindowName;
+private final TimeTrackerSupplier sharedTimeTrackerSupplier;
 private final boolean enableSpuriousResultFix;
+private final Optional outerJoinWindowName;
 private final long windowsBeforeMs;
 private final long windowsAfterMs;
 
-private final boolean outer;
-private final boolean isLeftSide;
-private final Optional outerJoinWindowName;
-private final ValueJoinerWithKey joiner;
-
-private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
-KStreamKStreamJoin(final boolean isLeftSide,
-   final String otherWindowName,
-   final JoinWindowsInternal windows,
-   final ValueJoinerWithKey joiner,
-   final boolean outer,
-   final Optional outerJoinWindowName,
-   final TimeTrackerSupplier sharedTimeTrackerSupplier) {
-this.isLeftSide = isLeftSide;
+KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier 
sharedTimeTrackerSupplier,
+   final Optional outerJoinWindowName, final long 
joinBeforeMs,
+   final long joinAfterMs, final JoinWindowsInternal 
windows, final boolean outer,
+   final ValueJoinerWithKey joiner) {

Review Comment:
   Revert the re-ordering here that isn't necessary, so it's easy to see the 
signature change for `joiner` and the removal of `isLeftSide`.
   
   We should also put each constructor argument on it's own to follow the 
previous style.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,95 +42,73 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {
 private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
-private final String otherWindowName;
+private final boolean outer;
+private final ValueJoinerWithKey joiner;
 private final long joinBeforeMs;
 private final long joinAfterMs;
 private final long joinGraceMs;
+private final String otherWindowName;
+private final TimeTrackerSupplier sharedTimeTrackerSupplier;
 private final boolean enableSpuriousResultFix;
+private final Optional outerJoinWindowName;
 private final long windowsBeforeMs;
 private final long windowsAfterMs;
 
-private final boolean outer;
-private final boolean isLeftSide;
-private final Optional outerJoinWindowName;
-private final ValueJoinerWithKey joiner;
-
-private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
-KStreamKStreamJoin(final boolean isLeftSide,
-   final String otherWindowName,
-   final JoinWindowsInternal windows,
-   final ValueJoinerWithKey joiner,
-   final boolean outer,
-   final Optional outerJoinWindowName,
-   final TimeTrackerSupplier 

Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-07 Thread via GitHub


FrankYang0529 commented on PR #15821:
URL: https://github.com/apache/kafka/pull/15821#issuecomment-2098714976

   Hi @chia7712, I rebase latest trunk branch, so we have 
`ConsumerGroupCommandTestUtils` now. The only remaining part is 
`SimpleConsumerGroupExecutor`. This is used by `ListConsumerGroupTest` and 
`DescribeConsumerGroupTest`. I think we can create a new class 
`SimpleConsumerGroupExecutorTestUtils` for it. WDYT? Thank you.
   
   
https://github.com/apache/kafka/blob/21bf715622e9d05984fa8a2a1f9f12d54b76ce41/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java#L327-L335


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-07 Thread via GitHub


C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1592678706


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -392,6 +399,146 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+/**
+ * General-purpose validation logic for converters that are configured 
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be 
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+ *  may be null, in which case no validation will 
be performed under the assumption that the
+ *  connector will use inherit the converter 
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ *(e.g., {@code 
org.apache.kafka.connect.storage.Converter.class});
+ *may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a 
{@link ConfigDef}
+ *  from an instance of the plugin type (e.g., 
{@code Converter::config});
+ *  may not be null
+ * @param pluginName a lowercase, human-readable name for the type of 
plugin (e.g., {@code "key converter"});
+ *   may not be null
+ * @param pluginProperty the property used to define a custom class for 
the plugin type
+ *   in a connector config (e.g., {@link 
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ *   may not be null
+ * @param defaultProperties any default properties to include in the 
configuration that will be used for
+ *  the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for 
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no 
custom plugin was defined in the connector
+ * config)
+
+ * @param  the plugin class to perform validation for
+ */
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue pluginConfigValue,
+Class pluginInterface,
+Function configDefAccessor,
+String pluginName,
+String pluginProperty,
+Map defaultProperties
+) {
+Objects.requireNonNull(connectorConfig);
+Objects.requireNonNull(pluginInterface);
+Objects.requireNonNull(configDefAccessor);
+Objects.requireNonNull(pluginName);
+Objects.requireNonNull(pluginProperty);
+
+String pluginClass = connectorConfig.get(pluginProperty);
+
+if (pluginClass == null
+|| pluginConfigValue == null
+|| !pluginConfigValue.errorMessages().isEmpty()
+) {
+// Either no custom converter was specified, or one was specified 
but there's a problem with it.
+// No need to proceed any further.
+return null;
+}
+
+T pluginInstance;
+try {
+pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+} catch (ClassNotFoundException | RuntimeException e) {
+log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", pluginName, pluginClass, e);
+pluginConfigValue.addErrorMessage("Failed to load class " + 
pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+return null;
+}
+
+try {
+ConfigDef configDef;
+try {
+configDef = configDefAccessor.apply(pluginInstance);
+} catch (RuntimeException e) {
+log.error("Failed to load ConfigDef from {} of type {}", 
pluginName, pluginClass, e);
+pluginConfigValue.addErrorMessage("Failed to load ConfigDef 
from " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+return null;
+}
+if (configDef == null) {
+log.warn("{}.config() has returned a null ConfigDef; no 
further preflight config validation for this converter will be performed", 
pluginClass);
+// Older versions of Connect didn't do any converter 
validation.
+// Even though converters are technically required to return a 
non-null ConfigDef object from their config() method,
+// we permit this case in order to avoid breaking existing 
converters that, despite not adhering to this requirement,
+// can be used successfully with a connector.
+return null;
+}
+final String 

Re: [PR] MINOR: use classic consumer with ZK mode for DeleteOffsetsConsumerGroupCommandIntegrationTest [kafka]

2024-05-07 Thread via GitHub


FrankYang0529 commented on code in PR #15872:
URL: https://github.com/apache/kafka/pull/15872#discussion_r1592664550


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -88,7 +113,7 @@ public void testDeleteOffsetsNonExistingGroup() {
 }
 }
 
-@ClusterTest
+@ClusterTemplate("generator")
 public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
 for (Map consumerConfig: consumerConfigs) {

Review Comment:
   Updated it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: enable test for ensureInternalEndpointIsSecured [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on code in PR #15868:
URL: https://github.com/apache/kafka/pull/15868#discussion_r1592649000


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java:
##
@@ -115,11 +112,11 @@ public void ensureInternalEndpointIsSecured() throws 
Throwable {
 // Try again, but with an invalid signature
 log.info(
 "Making a POST request to the {} endpoint with no connector 
started and an invalid signature header; "
-+ "expecting 403 error response",
++ "expecting 503 error response",
 connectorTasksEndpoint
 );
 assertEquals(
-FORBIDDEN.getStatusCode(),
+SERVICE_UNAVAILABLE.getStatusCode(),

Review Comment:
   Could you share the details with me?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: merge unit test down to the class of integration test [kafka]

2024-05-07 Thread via GitHub


KevinZTW opened a new pull request, #15884:
URL: https://github.com/apache/kafka/pull/15884

   Merge the unit test functions down to the class of integration test to avoid 
two test class under the same file
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16645) CVEs in 3.7.0 docker image

2024-05-07 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844334#comment-17844334
 ] 

Igor Soarez commented on KAFKA-16645:
-

The vulnerability report flags {{libexpat}} version {{{}2.5.0-r2{}}}. Both 
{{apache/kafka:3.7.0}} and {{apache/kafka:latest}} ship with the library:
{code:java}
$ docker run --rm -it apache/kafka:3.7.0 -- apk list | grep libexpat
libexpat-2.5.0-r2 aarch64 {expat} (MIT) [installed]~ak trunk ⇣
$ docker run --rm -it apache/kafka:latest -- apk list | grep libexpat
libexpat-2.5.0-r2 aarch64 {expat} (MIT) [installed]{code}
Neither Kafka nor its container image directly depend on {{{}libexpat{}}}. The 
library is instead bundled into the [base image 
{{eclipse-temurin:21-jre-alpine}}|https://github.com/apache/kafka/blob/21bf715622e9d05984fa8a2a1f9f12d54b76ce41/docker/jvm/Dockerfile#L44].
{code:java}
$ docker run --rm -it eclipse-temurin:21-jre-alpine -- apk list | grep libexpat
libexpat-2.6.2-r0 aarch64 {expat} (MIT) [installed]
$ docker inspect eclipse-temurin:21-jre-alpine | jq -r '.[0].Created'
2024-04-23T20:51:38Z~ak/docker trunk ⇣
$ docker inspect apache/kafka:3.7.0 | jq -r '.[0].Created'
2024-02-09T14:51:42.808028351Z~ak/docker trunk ⇣
$ docker inspect apache/kafka:latest | jq -r '.[0].Created'
2024-02-09T14:51:42.808028351Z{code}
The vulnerability has already been addressed in the base image, under the same 
image tag. To confirm, I ran the vulnerability scanner against a locally built 
image.
{code:java}
$ python docker_build_test.py kafka/test -tag=localkafkaimg -type=jvm 
-u=https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
(...)
$ docker run -it -v /var/run/docker.sock:/var/run/docker.sock aquasec/trivy 
image kafka/test:localkafkaimg -s CRITICAL,HIGH
2024-05-07T14:57:18Z    INFO    Need to update DB
2024-05-07T14:57:18Z    INFO    Downloading DB...       
repository="ghcr.io/aquasecurity/trivy-db:2"
45.90 MiB / 45.90 MiB 
[---]
 100.00% 20.98 MiB p/s 2.4s
2024-05-07T14:57:22Z    INFO    Vulnerability scanning is enabled
2024-05-07T14:57:22Z    INFO    Secret scanning is enabled
2024-05-07T14:57:22Z    INFO    If your scanning is slow, please try 
'--scanners vuln' to disable secret scanning
2024-05-07T14:57:22Z    INFO    Please see also 
https://aquasecurity.github.io/trivy/v0.51/docs/scanner/secret/#recommendation 
for faster secret detection
2024-05-07T14:57:23Z    INFO    Java DB Repository      
repository=ghcr.io/aquasecurity/trivy-java-db:1
2024-05-07T14:57:23Z    INFO    Downloading the Java DB...
606.06 MiB / 606.06 MiB 
[--]
 100.00% 24.19 MiB p/s 25s
2024-05-07T14:57:49Z    INFO    The Java DB is cached for 3 days. If you want 
to update the database more frequently, the '--reset' flag clears the DB cache.
2024-05-07T14:57:49Z    INFO    Detected OS     family="alpine" version="3.19.1"
2024-05-07T14:57:49Z    INFO    [alpine] Detecting vulnerabilities...   
os_version="3.19" repository="3.19" pkg_num=43
2024-05-07T14:57:49Z    INFO    Number of language-specific files       num=1
2024-05-07T14:57:49Z    INFO    [jar] Detecting vulnerabilities...

kafka/test:localkafkaimg (alpine 3.19.1)

Total: 0 (HIGH: 0, CRITICAL: 0){code}
I don't think we republish releases without a version change, so unless we want 
to create an exception for container images and republish the {{3.7.0}} and 
{{latest}} tags now, so I propose we take no action here as the next images 
built will not have these issues.

Please let me know if you disagree.

> CVEs in 3.7.0 docker image
> --
>
> Key: KAFKA-16645
> URL: https://issues.apache.org/jira/browse/KAFKA-16645
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1
>
>
> Our [Docker Image CVE 
> Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub 
> action reports 2 high CVEs in our base image:
> apache/kafka:3.7.0 (alpine 3.19.1)
> ==
> Total: 2 (HIGH: 2, CRITICAL: 0)
> ┌──┬┬──┬┬───┬───┬─┐
> │ Library  │ Vulnerability  │ Severity │ Status │ Installed Version │ Fixed 
> Version │Title│
> ├──┼┼──┼┼───┼───┼─┤
> │ libexpat │ CVE-2023-52425 │ HIGH │ fixed  │ 2.5.0-r2  │ 
> 2.6.0-r0  │ expat: parsing large tokens can trigger a denial of 

[PR] MINOR: Rewrite OptimizedUniformAssignmentBuilder#assignStickyPartitions to improve performance [kafka]

2024-05-07 Thread via GitHub


dajac opened a new pull request, #15883:
URL: https://github.com/apache/kafka/pull/15883

   This PR is still WIP. I am still playing with an alternative approach.
   
   Trunk:
   ```
   Benchmark   (assignmentType)  
(assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  
(subscriptionModel)  (topicCount)  Mode  Cnt   Score   Error  Units
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL 
UNIFORM  false  1 10  
HOMOGENEOUS   100  avgt5   27.636 ±  0.131  ms/op
   ```
   
   Patch:
   ```
   Benchmark   (assignmentType)  
(assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  
(subscriptionModel)  (topicCount)  Mode  Cnt   Score   Error  Units
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL 
UNIFORM  false  1 10  
HOMOGENEOUS   100  avgt5  20.868 ± 0.320  ms/op
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on code in PR #15863:
URL: https://github.com/apache/kafka/pull/15863#discussion_r1592508554


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -101,6 +101,7 @@ class LogCleaner(initialConfig: CleanerConfig,
  time: Time = Time.SYSTEM) extends Logging with 
BrokerReconfigurable {
   // Visible for test.
   private[log] val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  activateMetrics()

Review Comment:
   As it gets called in `startup`, do we need to call it in construction?



##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -118,6 +118,23 @@ class LogCleanerTest extends Logging {
 }
   }
 
+  @Test
+  def testMetricsActiveAfterReconfiguration(): Unit = {
+val logCleaner = new LogCleaner(new CleanerConfig(true),
+  logDirs = Array(TestUtils.tempDir()),
+  logs = new Pool[TopicPartition, UnifiedLog](),
+  logDirFailureChannel = new LogDirFailureChannel(1),
+  time = time)
+
+try {
+  logCleaner.reconfigure(new KafkaConfig(TestUtils.createBrokerConfig(1, 
"localhost:2181")),
+new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181")))
+
+  LogCleaner.MetricNames.foreach(name => 
assertNotNull(KafkaYammerMetrics.defaultRegistry.allMetrics().get(logCleaner.metricsGroup

Review Comment:
   We can use `MetricName#getName` to simplify the code. For example:
   ```scala
 val nonexistent = 
LogCleaner.MetricNames.diff(KafkaYammerMetrics.defaultRegistry.allMetrics().keySet().asScala.map(_.getName))
 assertEquals(0, nonexistent.size, s"$nonexistent should be existent")
   ```



##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -118,6 +118,23 @@ class LogCleanerTest extends Logging {
 }
   }
 
+  @Test
+  def testMetricsActiveAfterReconfiguration(): Unit = {
+val logCleaner = new LogCleaner(new CleanerConfig(true),
+  logDirs = Array(TestUtils.tempDir()),
+  logs = new Pool[TopicPartition, UnifiedLog](),
+  logDirFailureChannel = new LogDirFailureChannel(1),
+  time = time)
+

Review Comment:
   Could you please add `startup` and then check the metrics get created?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]

2024-05-07 Thread via GitHub


chiacyu commented on code in PR #15863:
URL: https://github.com/apache/kafka/pull/15863#discussion_r1592516715


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -159,6 +159,7 @@ class LogCleaner(initialConfig: CleanerConfig,
   cleaners += cleaner
   cleaner.start()
 }
+activateMetrics();

Review Comment:
   Already added the test on the JIRA page: 
[KAFKA-16574](https://issues.apache.org/jira/browse/KAFKA-16574). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]

2024-05-07 Thread via GitHub


chiacyu commented on code in PR #15863:
URL: https://github.com/apache/kafka/pull/15863#discussion_r1592509827


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -182,6 +183,27 @@ class LogCleaner(initialConfig: CleanerConfig,
 cleanerManager.removeMetrics()

Review Comment:
   Yes, the metrics remained removed after reconfiguring, would it be a good 
idea to remove `cleanerManager.removeMetrics()` this line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16307) fix EventAccumulator thread idle ratio metric

2024-05-07 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16307.
-
Fix Version/s: 3.8.0
 Reviewer: David Jacot
   Resolution: Fixed

> fix EventAccumulator thread idle ratio metric
> -
>
> Key: KAFKA-16307
> URL: https://issues.apache.org/jira/browse/KAFKA-16307
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
> Fix For: 3.8.0
>
>
> The metric does not seem to be accurate, nor reporting metrics at every 
> interval. Requires investigation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16307: fix coordinator thread idle ratio [kafka]

2024-05-07 Thread via GitHub


dajac merged PR #15835:
URL: https://github.com/apache/kafka/pull/15835


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-10199: Handle assignment with new remove operation in state updater [kafka]

2024-05-07 Thread via GitHub


cadonna opened a new pull request, #15882:
URL: https://github.com/apache/kafka/pull/15882

   Uses the new remove operation of the state updater that returns
   a future to handle task assignment.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16640: Replace TestUtils#resource by scala.util.Using [kafka]

2024-05-07 Thread via GitHub


frankvicky opened a new pull request, #15881:
URL: https://github.com/apache/kafka/pull/15881

   Check all uses of `TestUtils#resource` and replace with `scala.util.Using`
   
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16685: Add parent exception to RLMTask warning logs [kafka]

2024-05-07 Thread via GitHub


jeqo opened a new pull request, #15880:
URL: https://github.com/apache/kafka/pull/15880

   
   [KAFKA-16685]


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16685) RLMTask warning logs do not include parent exception trace

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya updated KAFKA-16685:
-
Summary: RLMTask warning logs do not include parent exception trace  (was: 
RLMTask warn logs do not include parent exception trace)

> RLMTask warning logs do not include parent exception trace
> --
>
> Key: KAFKA-16685
> URL: https://issues.apache.org/jira/browse/KAFKA-16685
> Project: Kafka
>  Issue Type: Improvement
>  Components: Tiered-Storage
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> When RSMTask exceptions happen and are logged, it only includes the exception 
> message, but we lose the stack trace.
> See 
> [https://github.com/apache/kafka/blob/70b8c5ae8e9336dbf4792e2d3cf100bbd70d6480/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L821-L831]
> This makes it difficult to troubleshoot issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16685) RLMTask warn logs do not include parent exception trace

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya updated KAFKA-16685:
-
Summary: RLMTask warn logs do not include parent exception trace  (was: RSM 
Task warn logs do not include parent exception trace)

> RLMTask warn logs do not include parent exception trace
> ---
>
> Key: KAFKA-16685
> URL: https://issues.apache.org/jira/browse/KAFKA-16685
> Project: Kafka
>  Issue Type: Improvement
>  Components: Tiered-Storage
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> When RSMTask exceptions happen and are logged, it only includes the exception 
> message, but we lose the stack trace.
> See 
> [https://github.com/apache/kafka/blob/70b8c5ae8e9336dbf4792e2d3cf100bbd70d6480/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L821-L831]
> This makes it difficult to troubleshoot issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16685) RLMTask warning logs do not include parent exception trace

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya updated KAFKA-16685:
-
Description: 
When RLMTask warning exceptions happen and are logged, it only includes the 
exception message, but we lose the stack trace.

See 
[https://github.com/apache/kafka/blob/70b8c5ae8e9336dbf4792e2d3cf100bbd70d6480/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L821-L831]

This makes it difficult to troubleshoot issues.

  was:
When RSMTask exceptions happen and are logged, it only includes the exception 
message, but we lose the stack trace.

See 
[https://github.com/apache/kafka/blob/70b8c5ae8e9336dbf4792e2d3cf100bbd70d6480/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L821-L831]

This makes it difficult to troubleshoot issues.


> RLMTask warning logs do not include parent exception trace
> --
>
> Key: KAFKA-16685
> URL: https://issues.apache.org/jira/browse/KAFKA-16685
> Project: Kafka
>  Issue Type: Improvement
>  Components: Tiered-Storage
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> When RLMTask warning exceptions happen and are logged, it only includes the 
> exception message, but we lose the stack trace.
> See 
> [https://github.com/apache/kafka/blob/70b8c5ae8e9336dbf4792e2d3cf100bbd70d6480/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L821-L831]
> This makes it difficult to troubleshoot issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16685) RSM Task warn logs do not include parent exception trace

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-16685:


 Summary: RSM Task warn logs do not include parent exception trace
 Key: KAFKA-16685
 URL: https://issues.apache.org/jira/browse/KAFKA-16685
 Project: Kafka
  Issue Type: Improvement
  Components: Tiered-Storage
Reporter: Jorge Esteban Quilcate Otoya
Assignee: Jorge Esteban Quilcate Otoya


When RSMTask exceptions happen and are logged, it only includes the exception 
message, but we lose the stack trace.

See 
[https://github.com/apache/kafka/blob/70b8c5ae8e9336dbf4792e2d3cf100bbd70d6480/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L821-L831]

This makes it difficult to troubleshoot issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]

2024-05-07 Thread via GitHub


cadonna merged PR #15870:
URL: https://github.com/apache/kafka/pull/15870


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]

2024-05-07 Thread via GitHub


AyoubOm commented on code in PR #15607:
URL: https://github.com/apache/kafka/pull/15607#discussion_r1592389047


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##
@@ -244,21 +248,21 @@ public void doJoinFromLeftThenDeleteLeftEntity() {
 );
 }
 }
-// Now delete one LHS entity such that one delete is propagated 
down to the output.
 
-left.pipeInput("lhs1", (String) null, baseTimestamp + 6);
+// Now delete one LHS entity such that one delete is propagated 
down to the output.
+left.pipeInput("lhs1", null, baseTimestamp + 6);
 assertThat(
-outputTopic.readKeyValuesToMap(),
-is(mkMap(
-mkEntry("lhs1", null)
+outputTopic.readKeyValuesToList(),
+is(Collections.singletonList(
+KeyValue.pair("lhs1", null)
 ))
 );
 if (rejoin) {
 assertThat(
-rejoinOutputTopic.readKeyValuesToMap(),
-is(mkMap(
-mkEntry("lhs1", null)
-))
+rejoinOutputTopic.readKeyValuesToList(),
+hasItem(

Review Comment:
   Not testing all items because the test is not deterministic, it may or may 
not contain `("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),null)")` depending on 
whether it's the left hand or the right hand side of the final join which 
propagates first. 
   
   `rejoin`  is computed this way in the test:
   
  ___LEFT
 ||
 |RIGHT   | 
 | |  |
 \ /  /  
  \   /  /
   \ /  /
 FKJoin/
\ /
 \   /
 Rejoin

   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]

2024-05-07 Thread via GitHub


AyoubOm commented on code in PR #15607:
URL: https://github.com/apache/kafka/pull/15607#discussion_r1592386867


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##
@@ -244,21 +248,21 @@ public void doJoinFromLeftThenDeleteLeftEntity() {
 );
 }
 }
-// Now delete one LHS entity such that one delete is propagated 
down to the output.
 
-left.pipeInput("lhs1", (String) null, baseTimestamp + 6);
+// Now delete one LHS entity such that one delete is propagated 
down to the output.
+left.pipeInput("lhs1", null, baseTimestamp + 6);
 assertThat(
-outputTopic.readKeyValuesToMap(),
-is(mkMap(
-mkEntry("lhs1", null)
+outputTopic.readKeyValuesToList(),
+is(Collections.singletonList(
+KeyValue.pair("lhs1", null)
 ))
 );
 if (rejoin) {
 assertThat(
-rejoinOutputTopic.readKeyValuesToMap(),
-is(mkMap(
-mkEntry("lhs1", null)
-))
+rejoinOutputTopic.readKeyValuesToList(),
+hasItem(

Review Comment:
   Not testing all items because the test is not deterministic, it may or may 
not contain `("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),null)")` depending on 
whether it's the left hand or the right hand side of the final join which 
propagates first. 
   
   `rejoin`  is computed this way in the test:
   
  ___LEFT
 ||
 |RIGHT   | 
 | |  |
 \ /  /  
  \   /  /
   \ /  /
 FKJoin/
\ /
 \   /
 Rejoin

   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]

2024-05-07 Thread via GitHub


AyoubOm commented on code in PR #15607:
URL: https://github.com/apache/kafka/pull/15607#discussion_r1592386867


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##
@@ -244,21 +248,21 @@ public void doJoinFromLeftThenDeleteLeftEntity() {
 );
 }
 }
-// Now delete one LHS entity such that one delete is propagated 
down to the output.
 
-left.pipeInput("lhs1", (String) null, baseTimestamp + 6);
+// Now delete one LHS entity such that one delete is propagated 
down to the output.
+left.pipeInput("lhs1", null, baseTimestamp + 6);
 assertThat(
-outputTopic.readKeyValuesToMap(),
-is(mkMap(
-mkEntry("lhs1", null)
+outputTopic.readKeyValuesToList(),
+is(Collections.singletonList(
+KeyValue.pair("lhs1", null)
 ))
 );
 if (rejoin) {
 assertThat(
-rejoinOutputTopic.readKeyValuesToMap(),
-is(mkMap(
-mkEntry("lhs1", null)
-))
+rejoinOutputTopic.readKeyValuesToList(),
+hasItem(

Review Comment:
   Not testing all items because the test is not deterministic, it may or may 
not contain `("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),null)")` depending on 
whether it's the left hand or the right hand side of the final join which 
propagates first. 
   
   `rejoin`  is computed this way in the test:
   
  ___LEFT
 ||
 |RIGHT   | 
 | |  |
 \ /  /  
  \   /  /
   \ /  /
 FKJoin/
\ /
 \   /
 Rejoin

   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16682) Rewrite JassTestUtils by Java

2024-05-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16682:
--

Assignee: TengYao Chi  (was: Chia-Ping Tsai)

> Rewrite JassTestUtils by Java
> -
>
> Key: KAFKA-16682
> URL: https://issues.apache.org/jira/browse/KAFKA-16682
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
>
> as title
> one more thing is that we should change the package name from kafka.utils to 
> kafka.security



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16682) Rewrite JassTestUtils by Java

2024-05-07 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844261#comment-17844261
 ] 

TengYao Chi commented on KAFKA-16682:
-

I am able to handle this issue.

> Rewrite JassTestUtils by Java
> -
>
> Key: KAFKA-16682
> URL: https://issues.apache.org/jira/browse/KAFKA-16682
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> as title
> one more thing is that we should change the package name from kafka.utils to 
> kafka.security



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] a slight change. [kafka]

2024-05-07 Thread via GitHub


gongxuanzhang commented on PR #15812:
URL: https://github.com/apache/kafka/pull/15812#issuecomment-2098224856

   > Hi @gongxuanzhang and thank you for the contribution!
   > 
   > This is actually an issue in a lot of places, I checked with this 
checkstyle.xml rule:
   > 
   > ```
   > 
   > ```
   > 
   > and `./gradlew checkstyleMain checkstyleTest --continue`.
   > 
   > While fixing this in one place is good, it would make sense to try and fix 
this everywhere if we decide to address it at all. I've created 
https://issues.apache.org/jira/browse/KAFKA-16643 for this, and you can take 
this on if you're interested. If so, please see the contributing guide: 
https://kafka.apache.org/contributing.html and join the mailing list and JIRA.
   > 
   > module violations
   > `:streams:upgrade-system-tests-24:checkstyleTest`  1
   > `:streams:upgrade-system-tests-22:checkstyleTest`  1
   > `:streams:upgrade-system-tests-23:checkstyleTest`  1
   > `:streams:upgrade-system-tests-26:checkstyleTest`  1
   > `:streams:upgrade-system-tests-28:checkstyleTest`  1
   > `:streams:upgrade-system-tests-30:checkstyleTest`  1
   > `:streams:upgrade-system-tests-25:checkstyleTest`  1
   > `:streams:upgrade-system-tests-31:checkstyleTest`  1
   > `:streams:upgrade-system-tests-27:checkstyleTest`  1
   > `:streams:upgrade-system-tests-32:checkstyleTest`  1
   > `:streams:upgrade-system-tests-36:checkstyleTest`  1
   > `:streams:upgrade-system-tests-35:checkstyleTest`  1
   > `:streams:upgrade-system-tests-34:checkstyleTest`  1
   > `:streams:upgrade-system-tests-33:checkstyleTest`  1
   > `:streams:upgrade-system-tests-37:checkstyleTest`  1
   > `:generator:checkstyleMain`2
   > `:connect:mirror-client:checkstyleMain`1
   > `:connect:api:checkstyleMain`  1
   > `:connect:json:checkstyleMain` 1
   > `:server-common:checkstyleMain`10
   > `:raft:checkstyleMain` 11
   > `:storage:checkstyleMain`  2
   > `:trogdor:checkstyleMain`  17
   > `:server:checkstyleMain`   122
   > `:connect:mirror:checkstyleMain`   3
   > `:connect:test-plugins:checkstyleMain` 2
   > `:tools:checkstyleMain`9
   > `:storage:storage-api:checkstyleMain`  20
   > `:streams:examples:checkstyleMain` 5
   > `:streams:test-utils:checkstyleMain`   2
   > `:group-coordinator:checkstyleMain`46
   > `:metadata:checkstyleMain` 72
   > `:raft:checkstyleTest` 5
   > `:connect:runtime:checkstyleMain`  2
   > `:group-coordinator:checkstyleTest`10
   > `:server-common:checkstyleTest`4
   > `:trogdor:checkstyleTest`  3
   > `:metadata:checkstyleTest` 73
   > `:streams:test-utils:checkstyleTest`   12
   > `:streams:checkstyleMain`  57
   > `:clients:checkstyleTest`  87
   > `:clients:checkstyleMain`  122
   > `:core:checkstyleMain` 1
   > `:shell:checkstyleMain`10
   > `:core:checkstyleTest` 12
   > `:shell:checkstyleTest`2
   > `:jmh-benchmarks:checkstyleMain`   1
   > `:connect:mirror:checkstyleTest`   4
   > `:connect:runtime:checkstyleTest`  15
   > `:streams:checkstyleTest`  201
   
   Thank you for your answer!
   I will continue to follow up and contribute.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16678: Remove variable "unimplementedquorum" [kafka]

2024-05-07 Thread via GitHub


frankvicky opened a new pull request, #15879:
URL: https://github.com/apache/kafka/pull/15879

   Remove variable "unimplementedquorum" from EndToEndAuthorizationTest.scala 
and SaslEndToEndAuthorizationTest.scala
   
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]

2024-05-07 Thread via GitHub


nizhikov commented on code in PR #15873:
URL: https://github.com/apache/kafka/pull/15873#discussion_r1592313103


##
core/src/test/java/kafka/admin/ConfigCommandUnitTest.java:
##
@@ -410,6 +448,430 @@ public void testOptionEntityTypeNames() {
 doTestOptionEntityTypeNames(false);
 }
 
+@Test
+public void shouldFailIfUnrecognisedEntityTypeUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "client", "--entity-type", "not-recognised", 
"--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
+}
+
+@Test
+public void shouldFailIfUnrecognisedEntityType() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "client", "--entity-type", "not-recognised", 
"--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
+}
+
+@Test
+public void shouldFailIfBrokerEntityTypeIsNotAnIntegerUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "A", "--entity-type", "brokers", "--alter", 
"--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
+}
+
+@Test
+public void shouldFailIfBrokerEntityTypeIsNotAnInteger() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "A", "--entity-type", "brokers", "--alter", 
"--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
+}
+
+@Test
+public void 
shouldFailIfShortBrokerEntityTypeIsNotAnIntegerUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--broker", "A", "--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
+}
+
+@Test
+public void shouldFailIfShortBrokerEntityTypeIsNotAnInteger() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--broker", "A", "--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
+}
+
+@Test
+public void shouldFailIfMixedEntityTypeFlagsUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "A", "--entity-type", "users", "--client", "B", 
"--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfMixedEntityTypeFlags() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "A", "--entity-type", "users", "--client", "B", 
"--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfInvalidHost() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "A,B", "--entity-type", "ips", "--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfInvalidHostUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "A,B", "--entity-type", "ips", "--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfUnresolvableHost() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "RFC2606.invalid", "--entity-type", "ips", 

Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-07 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1592297330


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java:
##
@@ -55,7 +57,7 @@ public class TieredStorageTestUtils {
 
 // Log cleanup interval is configured to be 500 ms. We need to wait at 
least that amount of time before
 // segments eligible for deletion gets physically removed.
-public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10;

Review Comment:
   Yes, I think the test is flaky because the CI environment is quite slow, and 
maybe the IO is slower than we thought. From the log I can see, at 
`08:59:27,187`, we copied `0002.log` to remote. And after 1 
second of `0002.log` copied to remote (`08:59:28,300`), it 
timed out and all resources started to get closed.
   
   I've waited 10 seconds for the log deletion to get completed, but obviously 
it's not enough for CI env. I've increased to 20 seconds and see if it fixes 
the issue. I think we've done what we can do to make it faster (i.e. set the 
configs to speed up the tests)
   
   ```
   [2024-05-04 08:59:27,187] INFO [RemoteLogManager=0 
partition=DcnVRVRSQd675ZLtCIn21A:topicB-0] Copied 0002.log to 
remote storage with segment-id: 
RemoteLogSegmentId{topicIdPartition=DcnVRVRSQd675ZLtCIn21A:topicB-0, 
id=gcVp790dRlmFCr_0tN0NTg} (kafka.log.remote.RemoteLogManager$RLMTask:792)
   
   [2024-05-04 08:59:28,300] INFO Closing topic-based RLMM resources 
   [2024-05-04 08:59:28,304] INFO Closing the instance 
(org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask:328)
   [2024-05-04 08:59:28,308] INFO Exited from consumer task thread 
(org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask:151)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Revoke tasks from state updater with new remove [kafka]

2024-05-07 Thread via GitHub


cadonna commented on code in PR #15871:
URL: https://github.com/apache/kafka/pull/15871#discussion_r1592297149


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -623,6 +623,21 @@ private void addToTasksToClose(final Map> futures,

Review Comment:
   Yes, we are going to re-use it. Stay tuned!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16684) FetchResponse#responseData could return incorrect data

2024-05-07 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844251#comment-17844251
 ] 

Chia-Ping Tsai commented on KAFKA-16684:


After 
[https://github.com/apache/kafka/commit/2b8aff58b575c199ee8372e5689420c9d77357a5]
 , I don't think the "cache" is useful. Hence, we can just remove the cache to 
fix this potential bug

> FetchResponse#responseData could return incorrect data
> --
>
> Key: KAFKA-16684
> URL: https://issues.apache.org/jira/browse/KAFKA-16684
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> [https://github.com/apache/kafka/commit/2b8aff58b575c199ee8372e5689420c9d77357a5]
>  make it accept input to return "partial" data. The content of output is 
> based on the input but we cache the output ... It will return same output 
> even though we pass different input. That is a potential bug.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16684) FetchResponse#responseData could return incorrect data

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16684:
--

 Summary: FetchResponse#responseData could return incorrect data
 Key: KAFKA-16684
 URL: https://issues.apache.org/jira/browse/KAFKA-16684
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


[https://github.com/apache/kafka/commit/2b8aff58b575c199ee8372e5689420c9d77357a5]
 make it accept input to return "partial" data. The content of output is based 
on the input but we cache the output ... It will return same output even though 
we pass different input. That is a potential bug.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]

2024-05-07 Thread via GitHub


nizhikov commented on code in PR #15873:
URL: https://github.com/apache/kafka/pull/15873#discussion_r1592249836


##
core/src/test/java/kafka/admin/ConfigCommandUnitTest.java:
##
@@ -418,4 +880,77 @@ public static String[] toArray(String... first) {
 public static String[] toArray(List... lists) {
 return Stream.of(lists).flatMap(List::stream).toArray(String[]::new);
 }
+
+@SafeVarargs
+public static List concat(List... lists) {
+return 
Stream.of(lists).flatMap(List::stream).collect(Collectors.toList());
+}
+
+@SafeVarargs
+public static  Map concat(Map...maps) {
+Map res = new HashMap<>();

Review Comment:
   Thanks. 
   ~~Used your version of method.~~
   
   Actually, we can't use this version (I have similar at first), because, some 
values of concatenating maps is null ang `merge` function used in 
`Collectors.toMap` checks `value` is not null:
   
   ```
   @Override
   public V merge(K key, V value,
  BiFunction 
remappingFunction) {
   if (value == null)
   throw new NullPointerException();
   if (remappingFunction == null)
   throw new NullPointerException();
   ...
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12385) Remove FetchResponse#responseData

2024-05-07 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844248#comment-17844248
 ] 

Chia-Ping Tsai commented on KAFKA-12385:


It seems to me `FetchResponse#responseData` is a weird function. The content of 
output is based on the input but we cache the output ... It will return same 
output even though we pass different input. That is a potential bug.

 

 

> Remove FetchResponse#responseData
> -
>
> Key: KAFKA-12385
> URL: https://issues.apache.org/jira/browse/KAFKA-12385
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> reference to [https://github.com/apache/kafka/pull/9758#discussion_r584142074]
> We can rewrite related code to avoid using stale data structure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10199: Revoke tasks from state updater with new remove [kafka]

2024-05-07 Thread via GitHub


lucasbru commented on code in PR #15871:
URL: https://github.com/apache/kafka/pull/15871#discussion_r1592261128


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -623,6 +623,21 @@ private void addToTasksToClose(final Map> futures,

Review Comment:
   Are we going to reuse this method? Otherwise, I find this a bit too 
abstract, and I'd just inline it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]

2024-05-07 Thread via GitHub


nizhikov commented on code in PR #15873:
URL: https://github.com/apache/kafka/pull/15873#discussion_r1592249836


##
core/src/test/java/kafka/admin/ConfigCommandUnitTest.java:
##
@@ -418,4 +880,77 @@ public static String[] toArray(String... first) {
 public static String[] toArray(List... lists) {
 return Stream.of(lists).flatMap(List::stream).toArray(String[]::new);
 }
+
+@SafeVarargs
+public static List concat(List... lists) {
+return 
Stream.of(lists).flatMap(List::stream).collect(Collectors.toList());
+}
+
+@SafeVarargs
+public static  Map concat(Map...maps) {
+Map res = new HashMap<>();

Review Comment:
   Thanks. 
   Used your version of method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 [2/N] ConfigCommandTest rewritten in java [kafka]

2024-05-07 Thread via GitHub


nizhikov commented on code in PR #15873:
URL: https://github.com/apache/kafka/pull/15873#discussion_r1592246946


##
core/src/test/java/kafka/admin/ConfigCommandUnitTest.java:
##
@@ -410,6 +448,430 @@ public void testOptionEntityTypeNames() {
 doTestOptionEntityTypeNames(false);
 }
 
+@Test
+public void shouldFailIfUnrecognisedEntityTypeUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "client", "--entity-type", "not-recognised", 
"--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
+}
+
+@Test
+public void shouldFailIfUnrecognisedEntityType() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "client", "--entity-type", "not-recognised", 
"--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
+}
+
+@Test
+public void shouldFailIfBrokerEntityTypeIsNotAnIntegerUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "A", "--entity-type", "brokers", "--alter", 
"--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
+}
+
+@Test
+public void shouldFailIfBrokerEntityTypeIsNotAnInteger() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "A", "--entity-type", "brokers", "--alter", 
"--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
+}
+
+@Test
+public void 
shouldFailIfShortBrokerEntityTypeIsNotAnIntegerUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--broker", "A", "--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
+}
+
+@Test
+public void shouldFailIfShortBrokerEntityTypeIsNotAnInteger() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--broker", "A", "--alter", "--add-config", "a=b,c=d"});
+assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
+}
+
+@Test
+public void shouldFailIfMixedEntityTypeFlagsUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "A", "--entity-type", "users", "--client", "B", 
"--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfMixedEntityTypeFlags() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "A", "--entity-type", "users", "--client", "B", 
"--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfInvalidHost() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "A,B", "--entity-type", "ips", "--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfInvalidHostUsingZookeeper() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
+"--entity-name", "A,B", "--entity-type", "ips", "--describe"});
+assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
+}
+
+@Test
+public void shouldFailIfUnresolvableHost() {
+ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
+"--entity-name", "RFC2606.invalid", "--entity-type", "ips", 

Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-07 Thread via GitHub


clolov commented on PR #15673:
URL: https://github.com/apache/kafka/pull/15673#issuecomment-2097975706

   Heya @junrao! Let me know your thoughts on my responses. I have futher 
updated the below + rebased. I am waiting on the tests to finish running and if 
they uncover something I will aim to remedy it today.
   
   MetadataVersionTest - I added 3.8-IV1 where I believe it is needed
   QuorumControllerTest - I did not add 3.8-IV1 because I believe the 
references to 3.8-IV0 are to test the ELR feature
   PartitionRegistrationTest - Updated
   BrokerMetadataPublisherTest - Updated
   ZkMigrationIntegrationTest - Updated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16679) Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, `FeatureCommandUnitTest` into

2024-05-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16679:
--

Assignee: Cheng-Kai, Zhang  (was: Chia-Ping Tsai)

> Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, 
> `FeatureCommandUnitTest` into 
> 
>
> Key: KAFKA-16679
> URL: https://issues.apache.org/jira/browse/KAFKA-16679
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Cheng-Kai, Zhang
>Priority: Minor
>
> Normally, we don't put multi test classes into single file. Those test 
> classes can be extracted into a new class file. Or we can merge them into 
> single class by using "@Test" annotation. That can make those test cases run 
> without embedded cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-07 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2097869168

   @junrao @satishd @chia7712 @showuon 
   
   Updated the test plan in the summary. Verified that the patch fixes the 
issue by running the trunk and patched build. With the fix, the high-watermark 
value gets updated to the valid offset. Please take a look when you get chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-07 Thread via GitHub


clolov commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1592116050


##
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##
@@ -70,7 +70,7 @@ else if (isolationLevel == IsolationLevel.READ_COMMITTED)
 minVersion = 2;
 else if (requireTimestamp)
 minVersion = 1;
-return new Builder(minVersion, 
ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
+return new Builder(minVersion, 
ApiKeys.LIST_OFFSETS.latestVersion(false), CONSUMER_REPLICA_ID, isolationLevel);

Review Comment:
   We cannot test the latest unstable version in the client, correct. This 
should be only temporary until I release the subsequent pull request where we 
introduce a correct OffsetSpec to allow clients to correctly call this 
behaviour and introduce tests for it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-07 Thread via GitHub


clolov commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1592114443


##
core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala:
##
@@ -219,7 +219,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 TestUtils.generateAndProduceMessages(servers, topic, 9)
 TestUtils.produceMessage(servers, topic, "test-10", 
System.currentTimeMillis() + 10L)
 
-for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to 
ApiKeys.LIST_OFFSETS.latestVersion) {
+for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to 
ApiKeys.LIST_OFFSETS.latestVersion(false)) {

Review Comment:
   I want to hide the latest unstable version of the ListOffsetRequest from 
everywhere. A follow-up pull request will allow it to be called and introduce 
the changes needed for the client to call with this a newer OffsetSpec and a 
test confirming the behaviour. Does this make sense or am I misunderstanding 
the question?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16679) Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, `FeatureCommandUnitTest` into

2024-05-07 Thread Cheng-Kai, Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844222#comment-17844222
 ] 

Cheng-Kai, Zhang commented on KAFKA-16679:
--

Hi [~chia7712] , could I work on this one? :D

> Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, 
> `FeatureCommandUnitTest` into 
> 
>
> Key: KAFKA-16679
> URL: https://issues.apache.org/jira/browse/KAFKA-16679
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> Normally, we don't put multi test classes into single file. Those test 
> classes can be extracted into a new class file. Or we can merge them into 
> single class by using "@Test" annotation. That can make those test cases run 
> without embedded cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]

2024-05-07 Thread via GitHub


sidyag commented on code in PR #15837:
URL: https://github.com/apache/kafka/pull/15837#discussion_r1592106849


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2822,6 +2822,31 @@ class KafkaApisTest extends Logging {
   () => kafkaApis.handleWriteTxnMarkersRequest(null, 
RequestLocal.withThreadConfinedCaching))
   }
 
+  @Test
+  def requiredAclsNotPresentWriteTxnMarkersThrowsAuthorizationException(): 
Unit = {

Review Comment:
   That is the happy case path verified by existing tests. As mocks are not 
present there, by default the CLUSTER_ACTION check doesn't throw an exception, 
and the ALTER check returns false.
   
   I can modify the existing tests to make that explicit and duplicate it to 
test for the second scenario.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]

2024-05-07 Thread via GitHub


sidyag commented on code in PR #15837:
URL: https://github.com/apache/kafka/pull/15837#discussion_r1592106849


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2822,6 +2822,31 @@ class KafkaApisTest extends Logging {
   () => kafkaApis.handleWriteTxnMarkersRequest(null, 
RequestLocal.withThreadConfinedCaching))
   }
 
+  @Test
+  def requiredAclsNotPresentWriteTxnMarkersThrowsAuthorizationException(): 
Unit = {

Review Comment:
   That is the happy case path verified by existing tests. As mocks are not 
present there, by default the CLUSTER_ACTION check doesn't throw an exception, 
and the ALTER check returns false.
   
   I can modify the existing tests to make that explicit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16676: Add missing RPCs to security.html [kafka]

2024-05-07 Thread via GitHub


AndrewJSchofield opened a new pull request, #15878:
URL: https://github.com/apache/kafka/pull/15878

   KIP-714 and KIP-1000 introduced 3 new RPCs. These new RPCs were not added to 
docs/security.html to document the authorization checks required to perform the 
requests.
   
   While I was adding these RPCs, I noticed that a few other recent RPCs had 
also not been added so I added those to.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16683) Extract security-related helpers from scala.TestUtils to java class

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16683:
--

 Summary: Extract security-related helpers from scala.TestUtils to 
java class
 Key: KAFKA-16683
 URL: https://issues.apache.org/jira/browse/KAFKA-16683
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


We can merge them into `JaasTestUtils and then rename `JaasTestUtils` to 
`SecurityTestUtils.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16680) Make ClusterTestExtensions support SASL

2024-05-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16680:
---
Description: 
This is a umbrella issue.

In order to migrate more tests to new test infra, we ought to make it support 
SASL at least.

*phase1: reuse/rewrite existent SASL utils by Java*
 # MiniKdc
 # JaasTestUtils
 # Move security-related helpers from scala.TestUtils
 # extract/rewrite non-zk code from SaslSetup to new java class

*phase2: make `ClusterTest#securityProtocol` works. It does not work for kraft 
mode :(*
 # add client-related helper to generate consumer/producer/admin class with 
security configs
 # configure kraft server with security settings
 # migrate tests of tools to use new test infra with security

 

  was:
This is a umbrella issue.

In order to migrate more tests to new test infra, we ought to make it support 
SASL at least.

*phase1: reuse/rewrite existent SASL utils by Java*
 # MiniKdc
 # JaasTestUtils
 # Move security-related helpers from scala.TestUtils to java.TestUtils
 # extract/rewrite non-zk code from SaslSetup to new java class

*phase2: make `ClusterTest#securityProtocol` works. It does not work for kraft 
mode :(*
 # add client-related helper to generate consumer/producer/admin class with 
security configs
 # configure kraft server with security settings
 # migrate tests of tools to use new test infra with security

 


> Make ClusterTestExtensions support SASL
> ---
>
> Key: KAFKA-16680
> URL: https://issues.apache.org/jira/browse/KAFKA-16680
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> This is a umbrella issue.
> In order to migrate more tests to new test infra, we ought to make it support 
> SASL at least.
> *phase1: reuse/rewrite existent SASL utils by Java*
>  # MiniKdc
>  # JaasTestUtils
>  # Move security-related helpers from scala.TestUtils
>  # extract/rewrite non-zk code from SaslSetup to new java class
> *phase2: make `ClusterTest#securityProtocol` works. It does not work for 
> kraft mode :(*
>  # add client-related helper to generate consumer/producer/admin class with 
> security configs
>  # configure kraft server with security settings
>  # migrate tests of tools to use new test infra with security
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16682) Rewrite JassTestUtils by Java

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16682:
--

 Summary: Rewrite JassTestUtils by Java
 Key: KAFKA-16682
 URL: https://issues.apache.org/jira/browse/KAFKA-16682
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


as title

one more thing is that we should change the package name from kafka.utils to 
kafka.security



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16681) Rewrite MiniKDC by Java

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16681:
--

 Summary: Rewrite MiniKDC by Java
 Key: KAFKA-16681
 URL: https://issues.apache.org/jira/browse/KAFKA-16681
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


Noted:
 # we need to move it from scala folder to java folder
 # don't change the package name since system tests requires it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16665: Allow to initialize newly assigned partition's positions without allowing fetching while callback runs [kafka]

2024-05-07 Thread via GitHub


lucasbru merged PR #15856:
URL: https://github.com/apache/kafka/pull/15856


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close

2024-05-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16589:
---
Parent: KAFKA-16680
Issue Type: Sub-task  (was: Improvement)

> Consider removing `ClusterInstance#createAdminClient` since callers are not 
> sure whether they need to call close
> 
>
> Key: KAFKA-16589
> URL: https://issues.apache.org/jira/browse/KAFKA-16589
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>
> Sometimes we close the admin created by `createAdminClient`, and sometimes we 
> don't. That is not a true problem since the `ClusterInstance` will call 
> `close` when stopping.
> However, that cause a lot of inconsistent code, and in fact it does not save 
> much time since creating a Admin is not a hard work. We can get 
> `bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily.
>  
> {code:java}
> // before
> try (Admin admin = cluster.createAdminClient()) { }
> // after v0
> try (Admin admin = Admin.create(Collections.singletonMap(
> CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
> cluster.bootstrapServers( {}
> {code}
> Personally, the `after` version is not verbose, but we can have alternatives: 
> `Map clientConfigs`.
>  
> {code:java}
> // after v1
> try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16680) Make ClusterTestExtensions support SASL

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16680:
--

 Summary: Make ClusterTestExtensions support SASL
 Key: KAFKA-16680
 URL: https://issues.apache.org/jira/browse/KAFKA-16680
 Project: Kafka
  Issue Type: New Feature
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


This is a umbrella issue.

In order to migrate more tests to new test infra, we ought to make it support 
SASL at least.

*phase1: reuse/rewrite existent SASL utils by Java*
 # MiniKdc
 # JaasTestUtils
 # Move security-related helpers from scala.TestUtils to java.TestUtils
 # extract/rewrite non-zk code from SaslSetup to new java class

*phase2: make `ClusterTest#securityProtocol` works. It does not work for kraft 
mode :(*
 # add client-related helper to generate consumer/producer/admin class with 
security configs
 # configure kraft server with security settings
 # migrate tests of tools to use new test infra with security

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Remove dev_version parameter from streams tests [kafka]

2024-05-07 Thread via GitHub


lucasbru commented on PR #15874:
URL: https://github.com/apache/kafka/pull/15874#issuecomment-2097704867

   @mjsax It does change, because we need to keep a list of tests (with 
parameters) that do not work with ARM. The `from_version` parameter does not 
change, because only old versions are affected. But the `to_version` parameter 
changed with every minor release. After removing the `to_version` parameter, we 
won't have to change the list again. Does that clarify the intent here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Remove lost tasks in state updater with new remove [kafka]

2024-05-07 Thread via GitHub


lucasbru commented on code in PR #15870:
URL: https://github.com/apache/kafka/pull/15870#discussion_r1591960681


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -602,6 +608,47 @@ private void removeUnusedTaskFromStateUpdater(final TaskId 
taskId) {
 tasks.addPendingTaskToCloseClean(taskId);
 }
 
+private void addToTasksToClose(final Map> futures,
+   final Set 
tasksToCloseCleanFromStateUpdater,
+   final Set 
tasksToCloseDirtyFromStateUpdater) {
+iterateAndActOnFuture(futures, removedTaskResult -> {
+final Task task = removedTaskResult.task();
+final Optional exception = 
removedTaskResult.exception();
+if (exception.isPresent()) {
+tasksToCloseDirtyFromStateUpdater.add(task);
+} else {
+tasksToCloseCleanFromStateUpdater.add(task);
+}
+});
+}
+
+private void iterateAndActOnFuture(final Map> futures,
+   final 
java.util.function.Consumer action) {
+for (final Map.Entry> entry : futures.entrySet()) {
+final TaskId taskId = entry.getKey();
+final CompletableFuture future = 
entry.getValue();
+try {
+final StateUpdater.RemovedTaskResult removedTaskResult = 
waitForFuture(taskId, future);
+action.accept(removedTaskResult);
+} catch (final ExecutionException executionException) {
+log.warn("An exception happened when removing task {} from the 
state updater. The exception will be handled later: ",
+taskId, executionException);
+} catch (final InterruptedException ignored) { }

Review Comment:
   Sounds good!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16677) Replace ClusterType#ALL and ClusterType#DEFAULT by Array

2024-05-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16677:
--

Assignee: PoAn Yang  (was: Chia-Ping Tsai)

> Replace ClusterType#ALL and ClusterType#DEFAULT by Array
> 
>
> Key: KAFKA-16677
> URL: https://issues.apache.org/jira/browse/KAFKA-16677
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>
> Both ClusterType#ALL and ClusterType#DEFAULT are a kind of "tag" instead of 
> true "type". It seems to me they can be removed by using Array. For example:
> ClusterType#ALL -> {Type.ZK, Type.KRAFT, Type.CO_KRAFT}
> ClusterType#DEFAULT -> {}
> There are two benefits
> 1. That is more readable for "ALL type". 
> 2. We don't throw the awkward "exception" when seeing "DEFAULT".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16679) Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, `FeatureCommandUnitTest` into

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16679:
--

 Summary: Merge `DeleteRecordsCommandUnitTest` into 
`DeleteRecordsCommandTest`, `FeatureCommandUnitTest` into 
 Key: KAFKA-16679
 URL: https://issues.apache.org/jira/browse/KAFKA-16679
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


Normally, we don't put multi test classes into single file. Those test classes 
can be extracted into a new class file. Or we can merge them into single class 
by using "@Test" annotation. That can make those test cases run without 
embedded cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16677) Replace ClusterType#ALL and ClusterType#DEFAULT by Array

2024-05-07 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844163#comment-17844163
 ] 

PoAn Yang commented on KAFKA-16677:
---

Hi [~chia7712], I'm interested in this feature. If you're not working on it, 
may I assign to myself? Thank you.

> Replace ClusterType#ALL and ClusterType#DEFAULT by Array
> 
>
> Key: KAFKA-16677
> URL: https://issues.apache.org/jira/browse/KAFKA-16677
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> Both ClusterType#ALL and ClusterType#DEFAULT are a kind of "tag" instead of 
> true "type". It seems to me they can be removed by using Array. For example:
> ClusterType#ALL -> {Type.ZK, Type.KRAFT, Type.CO_KRAFT}
> ClusterType#DEFAULT -> {}
> There are two benefits
> 1. That is more readable for "ALL type". 
> 2. We don't throw the awkward "exception" when seeing "DEFAULT".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16615) JoinGroup API for upgrading ConsumerGroup

2024-05-07 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16615.
-
Fix Version/s: 3.8.0
 Reviewer: David Jacot
 Assignee: Dongnuo Lyu
   Resolution: Fixed

> JoinGroup API for upgrading ConsumerGroup
> -
>
> Key: KAFKA-16615
> URL: https://issues.apache.org/jira/browse/KAFKA-16615
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-05-07 Thread via GitHub


dajac merged PR #15798:
URL: https://github.com/apache/kafka/pull/15798


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16678) Remove unimplementedquorum from EndToEndAuthorizationTest

2024-05-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16678:
--

Assignee: TengYao Chi  (was: Chia-Ping Tsai)

> Remove unimplementedquorum from EndToEndAuthorizationTest
> -
>
> Key: KAFKA-16678
> URL: https://issues.apache.org/jira/browse/KAFKA-16678
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
>
> `unimplementedquorum`[0] is used to skip test cases if they don't support to 
> run by kraft. However, KAFKA-15219 , KAFKA-14765 and KAFKA-14776 make related 
> tests support to run by kraft.
> In short, it is time to remove the unused variable :)
> [0] 
> [https://github.com/apache/kafka/blob/d76352e2151178521dc447e3406dabb8fcd4c57c/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L146]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16678) Remove unimplementedquorum from EndToEndAuthorizationTest

2024-05-07 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844157#comment-17844157
 ] 

TengYao Chi commented on KAFKA-16678:
-

I am able to handle this issue.

> Remove unimplementedquorum from EndToEndAuthorizationTest
> -
>
> Key: KAFKA-16678
> URL: https://issues.apache.org/jira/browse/KAFKA-16678
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> `unimplementedquorum`[0] is used to skip test cases if they don't support to 
> run by kraft. However, KAFKA-15219 , KAFKA-14765 and KAFKA-14776 make related 
> tests support to run by kraft.
> In short, it is time to remove the unused variable :)
> [0] 
> [https://github.com/apache/kafka/blob/d76352e2151178521dc447e3406dabb8fcd4c57c/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L146]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16678) Remove unimplementedquorum from EndToEndAuthorizationTest

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16678:
--

 Summary: Remove unimplementedquorum from EndToEndAuthorizationTest
 Key: KAFKA-16678
 URL: https://issues.apache.org/jira/browse/KAFKA-16678
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


`unimplementedquorum`[0] is used to skip test cases if they don't support to 
run by kraft. However, KAFKA-15219 , KAFKA-14765 and KAFKA-14776 make related 
tests support to run by kraft.

In short, it is time to remove the unused variable :)

[0] 
[https://github.com/apache/kafka/blob/d76352e2151178521dc447e3406dabb8fcd4c57c/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L146]

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Kafka-16668: improve cluster test [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on code in PR #15861:
URL: https://github.com/apache/kafka/pull/15861#discussion_r1591866899


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -306,11 +300,16 @@ public Builder setPerBrokerProperties(Map> perBroke
 return this;
 }
 
+public Builder setTags(String[] tags) {

Review Comment:
   We use `Array` in annotation, but it would be better to use `List` here.



##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -153,15 +148,19 @@ public Map> 
perBrokerOverrideProperties() {
 return perBrokerOverrideProperties;
 }
 
-public Map nameTags() {
-Map tags = new LinkedHashMap<>(4);
-name().ifPresent(name -> tags.put("Name", name));
-tags.put("MetadataVersion", metadataVersion.toString());
-tags.put("Security", securityProtocol.name());
-listenerName().ifPresent(listener -> tags.put("Listener", listener));
+public String[] tags() {
 return tags;
 }
 
+public Map nameTags() {

Review Comment:
   Could you please add the `tags` to this method? Also, it would be great to 
rename it to "displayName", and the return type should be `String` instead of 
`Map`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16668: improve cluster test [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on PR #15861:
URL: https://github.com/apache/kafka/pull/15861#issuecomment-2097528853

   As #15766 gets merged now, could you please fix the build error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16677) Replace ClusterType#ALL and ClusterType#DEFAULT by Array

2024-05-07 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844150#comment-17844150
 ] 

PoAn Yang commented on KAFKA-16677:
---

Hi [~chia7712], I think the feature is good. Most of time, we use same config 
to run KRAFT and CO_KRAFT. If we can use `setTypes`, we don't need to create 
another builder.

> Replace ClusterType#ALL and ClusterType#DEFAULT by Array
> 
>
> Key: KAFKA-16677
> URL: https://issues.apache.org/jira/browse/KAFKA-16677
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> Both ClusterType#ALL and ClusterType#DEFAULT are a kind of "tag" instead of 
> true "type". It seems to me they can be removed by using Array. For example:
> ClusterType#ALL -> {Type.ZK, Type.KRAFT, Type.CO_KRAFT}
> ClusterType#DEFAULT -> {}
> There are two benefits
> 1. That is more readable for "ALL type". 
> 2. We don't throw the awkward "exception" when seeing "DEFAULT".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: log newly created processId [kafka]

2024-05-07 Thread via GitHub


chia7712 merged PR #15851:
URL: https://github.com/apache/kafka/pull/15851


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16677) Replace ClusterType#ALL and ClusterType#DEFAULT by Array

2024-05-07 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844148#comment-17844148
 ] 

Chia-Ping Tsai commented on KAFKA-16677:


For another, we can simplify the code 
(https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java#L77)


{code:java}
// Some comments here
ClusterConfig raftWithNewGroupCoordinator = 
ClusterConfig.defaultBuilder()
.setTypes(Arrays.asList(KRAFT, CO_KRAFT))
.setName("newGroupCoordinator")
.setServerProperties(serverProperties)
.build();
{code}


> Replace ClusterType#ALL and ClusterType#DEFAULT by Array
> 
>
> Key: KAFKA-16677
> URL: https://issues.apache.org/jira/browse/KAFKA-16677
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> Both ClusterType#ALL and ClusterType#DEFAULT are a kind of "tag" instead of 
> true "type". It seems to me they can be removed by using Array. For example:
> ClusterType#ALL -> {Type.ZK, Type.KRAFT, Type.CO_KRAFT}
> ClusterType#DEFAULT -> {}
> There are two benefits
> 1. That is more readable for "ALL type". 
> 2. We don't throw the awkward "exception" when seeing "DEFAULT".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Correct connector scheduled rebalance logs [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on code in PR #15875:
URL: https://github.com/apache/kafka/pull/15875#discussion_r1591859505


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1768,7 +1768,7 @@ private boolean handleRebalanceCompleted() {
 long now = time.milliseconds();
 if (scheduledRebalance <= now) {
 log.debug("Requesting rebalance because scheduled rebalance 
timeout has been reached "
-+ "(now: {} scheduledRebalance: {}", scheduledRebalance, 
now);
++ "(now: {} scheduledRebalance: {}", now, 
scheduledRebalance);

Review Comment:
   Could you please remove `(` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16593) Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions

2024-05-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16593.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions 
> --
>
> Key: KAFKA-16593
> URL: https://issues.apache.org/jira/browse/KAFKA-16593
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
> Fix For: 3.8.0
>
>
> as title. the test is in tools module and it does not need to depend on test 
> code of core module directly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-05-07 Thread via GitHub


chia7712 merged PR #15766:
URL: https://github.com/apache/kafka/pull/15766


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16629: Add broker-related tests to ConfigCommandIntegrationTest [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on code in PR #15840:
URL: https://github.com/apache/kafka/pull/15840#discussion_r1591853150


##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -171,120 +144,352 @@ public void 
testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception {
 KafkaZkClient zkClient = 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkClient();
 
 String brokerId = "1";
-adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
-alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
+AdminZkClient adminZkClient = new AdminZkClient(zkClient, 
scala.None$.empty());
+alterOpts = asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
 
 // Add config
-alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "11"), Optional.of(brokerId));
-alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "12"), Optional.empty());
+alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+singletonMap("message.max.size", "11"));
+alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
+singletonMap("message.max.size", "12"));
 
 // Change config
-alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "13"), Optional.of(brokerId));
-alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "14"), Optional.empty());
+alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+singletonMap("message.max.size", "13"));
+alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
+singletonMap("message.max.size", "14"));
 
 // Delete config
-deleteAndVerifyConfig(zkClient, 
Collections.singleton("message.max.size"), Optional.of(brokerId));
-deleteAndVerifyConfig(zkClient, 
Collections.singleton("message.max.size"), Optional.empty());
+deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+singleton("message.max.size"));
+deleteAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
+singleton("message.max.size"));
 
 // Listener configs: should work only with listener name
-alterAndVerifyConfig(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"), Optional.of(brokerId));
+alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"));
 assertThrows(ConfigException.class,
-() -> alterConfigWithZk(zkClient, 
Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), 
Optional.of(brokerId)));
+() -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(brokerId),
+singletonMap("ssl.keystore.location", 
"/tmp/test.jks")));
 
 // Per-broker config configured at default cluster-level should fail
 assertThrows(ConfigException.class,
-() -> alterConfigWithZk(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"), Optional.empty()));
-deleteAndVerifyConfig(zkClient, 
Collections.singleton("listener.name.external.ssl.keystore.location"), 
Optional.of(brokerId));
+() -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.empty(),
+
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")));
+deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+singleton("listener.name.external.ssl.keystore.location"));
 
 // Password config update without encoder secret should fail
 assertThrows(IllegalArgumentException.class,
-() -> alterConfigWithZk(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.password", 
"secret"), Optional.of(brokerId)));
+() -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(brokerId),
+
singletonMap("listener.name.external.ssl.keystore.password", "secret")));
 
 // Password config update with encoder secret should succeed and 
encoded password must be stored in ZK
 Map configs = new HashMap<>();
 configs.put("listener.name.external.ssl.keystore.password", "secret");
 configs.put("log.cleaner.threads", "2");
-Map encoderConfigs = 
Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, 
"encoder-secret");
-alterConfigWithZk(zkClient, configs, Optional.of(brokerId), 
encoderConfigs);
+Map encoderConfigs = new HashMap<>(configs);
+

<    1   2