[GitHub] [kafka] akhileshchg commented on a diff in pull request #12106: KAFKA-13861: Fix the validateOnly behavior for CreatePartitions requests in KRaft mode

2022-05-02 Thread GitBox


akhileshchg commented on code in PR #12106:
URL: https://github.com/apache/kafka/pull/12106#discussion_r863475611


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1364,7 +1364,8 @@ Boolean isBrokerUnfenced(int brokerId) {
 setErrorCode(apiError.error().code()).
 setErrorMessage(apiError.message()));
 }
-return new ControllerResult<>(records, results, true);
+log.debug("CreatePartitions result(s): {}", results);

Review Comment:
   I'll put a debug log in the controller layer before processing the final 
list of topics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akhileshchg commented on a diff in pull request #12106: KAFKA-13861: Fix the validateOnly behavior for CreatePartitions requests in KRaft mode

2022-05-02 Thread GitBox


akhileshchg commented on code in PR #12106:
URL: https://github.com/apache/kafka/pull/12106#discussion_r863474931


##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -499,23 +504,28 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic2).get,
 () => s"$desc: Expect InvalidPartitionsException when requesting a 
noop")
   assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
-  assertEquals("Topic already has 3 partitions.", e.getCause.getMessage, 
desc)
+  exceptionMsgStr = if (TestInfoUtils.isKRaft(testInfo)) {
+"Topic already has 3 partition(s)."
+  } else {
+"Topic already has 3 partitions."

Review Comment:
   There are too many error messages in the `ReplicationControl` layer, and I'm 
not sure how many other layers need to be changed. I think this can be a 
different PR to keep the messages intact if required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #11889: KAFKA-13667: Make listeners mandatory in kraft mode

2022-05-02 Thread GitBox


dengziming commented on PR #11889:
URL: https://github.com/apache/kafka/pull/11889#issuecomment-1115729719

   @showuon 
   Oh yeah, I think I misunderstood your intention here. Firstly I think it's 
weird to have a config optional for the broker node but mandatory for the 
controller node and combine node so I wanted to make it mandatory for all kraft 
nodes.
   
   I think the problem here is just to make sure we should provide both 
listeners for the combined node, is this right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-02 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863330742


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java:
##
@@ -51,95 +41,58 @@ public class ConnectProtocolCompatibilityTest {
 private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0);
 private ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0);
 
-@Rule
-public MockitoRule rule = MockitoJUnit.rule();
-
-@Mock
-private KafkaConfigBackingStore configStorage;
-private ClusterConfigState configState;
-
-@Before
-public void setup() {
-configStorage = mock(KafkaConfigBackingStore.class);
-configState = new ClusterConfigState(
-1L,
-null,
-Collections.singletonMap(connectorId1, 1),
-Collections.singletonMap(connectorId1, new HashMap<>()),
-Collections.singletonMap(connectorId1, TargetState.STARTED),
-Collections.singletonMap(taskId1x0, new HashMap<>()),
-Collections.emptySet());
-}
-
-@After
-public void teardown() {
-verifyNoMoreInteractions(configStorage);
-}

Review Comment:
   All of this is completely unnecessary and can be removed without diminishing 
testing coverage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-02 Thread GitBox


C0urante commented on PR #11983:
URL: https://github.com/apache/kafka/pull/11983#issuecomment-1115554702

   Thanks @showuon, good call with the improvement to the serialization logic. 
Took a bit of legwork but I've pushed a change that implements that and also 
cleans up some testing clutter; LMKWYT.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-02 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863330276


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment(
 log.debug("Incremental connector assignments: {}", 
incrementalConnectorAssignments);
 log.debug("Incremental task assignments: {}", 
incrementalTaskAssignments);
 
+Map> revokedConnectors = 
transformValues(toRevoke, ConnectorsAndTasks::connectors);
+Map> revokedTasks = 
transformValues(toRevoke, ConnectorsAndTasks::tasks);
+
 return new ClusterAssignment(
 incrementalConnectorAssignments,
 incrementalTaskAssignments,
-transformValues(toRevoke, ConnectorsAndTasks::connectors),
-transformValues(toRevoke, ConnectorsAndTasks::tasks),
-connectorAssignments,
-taskAssignments
+revokedConnectors,
+revokedTasks,
+diff(connectorAssignments, revokedConnectors),
+diff(taskAssignments, revokedTasks)

Review Comment:
   I don't think so; it looks like we compute load-balancing revocations later 
on, around line 300. At line 279, the `completeWorkerAssignment` that we derive 
`connectorAssignments` and `taskAssignments` from only has the `deleted` 
connectors and tasks removed from it; everything else is still included.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-02 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863330276


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment(
 log.debug("Incremental connector assignments: {}", 
incrementalConnectorAssignments);
 log.debug("Incremental task assignments: {}", 
incrementalTaskAssignments);
 
+Map> revokedConnectors = 
transformValues(toRevoke, ConnectorsAndTasks::connectors);
+Map> revokedTasks = 
transformValues(toRevoke, ConnectorsAndTasks::tasks);
+
 return new ClusterAssignment(
 incrementalConnectorAssignments,
 incrementalTaskAssignments,
-transformValues(toRevoke, ConnectorsAndTasks::connectors),
-transformValues(toRevoke, ConnectorsAndTasks::tasks),
-connectorAssignments,
-taskAssignments
+revokedConnectors,
+revokedTasks,
+diff(connectorAssignments, revokedConnectors),
+diff(taskAssignments, revokedTasks)

Review Comment:
   I don't think so; it looks like we compute load-balancing revocations later 
on, around line 300. At line 279, the `completeWorkerAssignment` that we derive 
`connectorAssignments` and `taskAssignments` from only has the `deleted` 
connectors and tasks removed from it; everything else (including load-balancing 
revocations and duplicated assignments) is still included.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-02 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863329552


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java:
##
@@ -230,15 +230,16 @@ public static ExtendedWorkerState 
deserializeMetadata(ByteBuffer buffer) {
  *   ScheduledDelay => Int32
  * 
  */
-public static ByteBuffer serializeAssignment(ExtendedAssignment 
assignment) {
+public static ByteBuffer serializeAssignment(ExtendedAssignment 
assignment, boolean sessioned) {
 // comparison depends on reference equality for now
 if (assignment == null || 
ExtendedAssignment.empty().equals(assignment)) {
 return null;
 }
 Struct struct = assignment.toStruct();
-ByteBuffer buffer = 
ByteBuffer.allocate(CONNECT_PROTOCOL_HEADER_V1.sizeOf()
+Struct protocolHeader = sessioned ? CONNECT_PROTOCOL_HEADER_V2 : 
CONNECT_PROTOCOL_HEADER_V1;
+ByteBuffer buffer = ByteBuffer.allocate(protocolHeader.sizeOf()
 + 
ASSIGNMENT_V1.sizeOf(struct));
-CONNECT_PROTOCOL_HEADER_V1.writeTo(buffer);
+protocolHeader.writeTo(buffer);

Review Comment:
   Ah yeah, good call! Much cleaner than what we had before. It was a little 
more involved than I initially thought to make this change but IMO the end 
result is cleaner and easier to read, so hopefully it's worth the inflation in 
the diff here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13859) SCRAM authentication issues with kafka-clients 3.0.1

2022-05-02 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17531010#comment-17531010
 ] 

Luke Chen commented on KAFKA-13859:
---

[~opayne] , thanks for the response. That confirmed our investigation that the 
broker version is older than v2.8 and client version is greater than v3.0. The 
exception is expected.

The workaround, as [~dengziming] suggested, to disable the idempotent producer, 
or upgrade the broker version to v2.8 or higher.

 

Thank you.

> SCRAM authentication issues with kafka-clients 3.0.1
> 
>
> Key: KAFKA-13859
> URL: https://issues.apache.org/jira/browse/KAFKA-13859
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.1
>Reporter: Oliver Payne
>Assignee: dengziming
>Priority: Major
>
> When attempting to produce records to Kafka using a client configured with 
> SCRAM authentication, the authentication is being rejected, and the following 
> exception is thrown:
> {{org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster 
> authorization failed.}}
> I am seeing this happen with a Springboot service that was recently upgraded 
> to 2.6.5. After looking into this, I learned that Springboot moved to 
> kafka-clients 3.0.1 from 3.0.0 in that version. And sure enough, downgrading 
> to kafka-clients resolved the issue, with no changes made to the configs.
> I have also attempted to connect to a separate server with kafka-clients 
> 3.0.1, using plaintext authentication. That works fine. So the issue appears 
> to be with SCRAM authentication.
> I will note that I am attempting to connect to an AWS MSK instance. We use 
> SCRAM-SHA-512 as our sasl mechanism, using the basic {{ScramLoginModule.}} 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] showuon commented on pull request #12105: KAFKA-13859: Disable idempotence on SCRAM authentication

2022-05-02 Thread GitBox


showuon commented on PR #12105:
URL: https://github.com/apache/kafka/pull/12105#issuecomment-1115548710

   > To be clear, the broker would have to be older than 2.8 for the issue to 
occur. The server change for KIP-679 happened in Apache Kafka 2.8.
   
   Yes, the user confirmed that their broker version is in v2.6.2 and client 
version is in v3.0.1. This is the expected behavior. Thanks @dengziming and 
@ijuma !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-02 Thread GitBox


showuon commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863325064


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment(
 log.debug("Incremental connector assignments: {}", 
incrementalConnectorAssignments);
 log.debug("Incremental task assignments: {}", 
incrementalTaskAssignments);
 
+Map> revokedConnectors = 
transformValues(toRevoke, ConnectorsAndTasks::connectors);
+Map> revokedTasks = 
transformValues(toRevoke, ConnectorsAndTasks::tasks);
+
 return new ClusterAssignment(
 incrementalConnectorAssignments,
 incrementalTaskAssignments,
-transformValues(toRevoke, ConnectorsAndTasks::connectors),
-transformValues(toRevoke, ConnectorsAndTasks::tasks),
-connectorAssignments,
-taskAssignments
+revokedConnectors,
+revokedTasks,
+diff(connectorAssignments, revokedConnectors),
+diff(taskAssignments, revokedTasks)

Review Comment:
   I think the `connectorAssignments` should be equal to 
`diff(connectorAssignments, revokedConnectors)` and `taskAssignments == 
diff(taskAssignments, revokedTasks)`, because in L279:
   
https://github.com/apache/kafka/pull/11983/files#diff-e24067b121eb960feebfa099bd9c30382e330eaf5db39302a9d7a50e29b3acb4R279-R283
   
   We already removed the revoked connectors/tasks. Is my understanding correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-02 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863308185


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -108,18 +107,15 @@ public Map performAssignment(String 
leaderId, String protoco
 log.debug("Max config offset root: {}, local snapshot config offsets 
root: {}",
   maxOffset, coordinator.configSnapshot().offset());
 
-short protocolVersion = memberConfigs.values().stream()
-.allMatch(state -> state.assignment().version() == 
CONNECT_PROTOCOL_V2)
-? CONNECT_PROTOCOL_V2
-: CONNECT_PROTOCOL_V1;
+short protocolVersion = 
ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion();

Review Comment:
   Yep, exactly 
   Should've known that when I implemented KIP-507 originally but was still 
getting my bearings with the group coordinator logic.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -108,18 +107,15 @@ public Map performAssignment(String 
leaderId, String protoco
 log.debug("Max config offset root: {}, local snapshot config offsets 
root: {}",
   maxOffset, coordinator.configSnapshot().offset());
 
-short protocolVersion = memberConfigs.values().stream()
-.allMatch(state -> state.assignment().version() == 
CONNECT_PROTOCOL_V2)
-? CONNECT_PROTOCOL_V2
-: CONNECT_PROTOCOL_V1;
+short protocolVersion = 
ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion();

Review Comment:
   Yep, exactly 
   Should've known that when I implemented KIP-507 originally but was still 
getting my bearings with the group coordinator logic. Better late than never!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing

2022-05-02 Thread GitBox


C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863308048


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment(
 log.debug("Incremental connector assignments: {}", 
incrementalConnectorAssignments);
 log.debug("Incremental task assignments: {}", 
incrementalTaskAssignments);
 
+Map> revokedConnectors = 
transformValues(toRevoke, ConnectorsAndTasks::connectors);
+Map> revokedTasks = 
transformValues(toRevoke, ConnectorsAndTasks::tasks);
+
 return new ClusterAssignment(
 incrementalConnectorAssignments,
 incrementalTaskAssignments,
-transformValues(toRevoke, ConnectorsAndTasks::connectors),
-transformValues(toRevoke, ConnectorsAndTasks::tasks),
-connectorAssignments,
-taskAssignments
+revokedConnectors,
+revokedTasks,
+diff(connectorAssignments, revokedConnectors),
+diff(taskAssignments, revokedTasks)

Review Comment:
   Yep! There was a bug here. FWIW the same issue is already fixed by 
https://github.com/apache/kafka/pull/12019.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12041: MINOR: ignore unused configuration when ConsumerCoordinator is not constructed

2022-05-02 Thread GitBox


C0urante commented on PR #12041:
URL: https://github.com/apache/kafka/pull/12041#issuecomment-1115515896

   Thanks Guozhang. I think the cost of logging warnings in cases like this is 
fairly low as users can and should adjust their configurations to not use 
nonsensical properties, and the benefit can be high in the event that a user is 
confused about client behavior. I do sympathize with concerns that the warning 
for an unused property may make it seem like the property is unconditionally 
unrecognized (i.e., not defined by a client at all) instead of conditionally 
unrecognized (i.e., not used because of other properties).
   
   One alternative could be to use the newly-introduced 
[ConnectUtils::ensureProperty](https://github.com/apache/kafka/blob/8245c9a3d5af2ad891844194a5fa281af471b568/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java#L77-L101)
 or something similar to it (possibly one that logs a warning if _any_ value 
for a specific property is given, regardless of whether it matches the 
default). This way, we could continue logging warnings for cases like these, 
but make it clear exactly why the property should not be included in the config.
   
   Either way, I think the piecemeal logic introduced in this PR is suboptimal. 
Dedicating one line for every to-be-ignored property is unnecessary if we want 
to remove these warnings for all properties defined by a client; in that case, 
we can use the [approach I described 
earlier](https://github.com/apache/kafka/pull/12041#issuecomment-1100583118), 
which will be easier to maintain and take up less space.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13857) The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition

2022-05-02 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17531003#comment-17531003
 ] 

RivenSun commented on KAFKA-13857:
--

[~guozhang] Thank you for your reply and approval.
I don't have a strong need to query LEO right now.
This feature may can be supported in future releases.
I may be busy with other things recently, and anyone interested in this can 
assign this JIRA to themselves.
Thanks.

> The listOffsets method of KafkaAdminClient should support returning 
> logEndOffset of topicPartition
> --
>
> Key: KAFKA-13857
> URL: https://issues.apache.org/jira/browse/KAFKA-13857
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: RivenSun
>Priority: Major
>
> The server side currently handles the LIST_OFFSETS request process as follows:
> {code:java}
> KafkaApis.handleListOffsetRequest() ->
> KafkaApis.handleListOffsetRequestV1AndAbove() ->
> ReplicaManager.fetchOffsetForTimestamp() ->
> Partition.fetchOffsetForTimestamp(){code}
>  
> In the last method above, it is obvious that when the client side does not 
> pass the isolationLevel value, the server side supports returning 
> localLog.logEndOffset.
> {code:java}
> val lastFetchableOffset = isolationLevel match {
>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
>   case None => localLog.logEndOffset
> } 
> {code}
>  
>  
> KafkaAdminClient is an operation and maintenance management tool, which 
> *should be different from the listOffsets-related methods (offsetsForTimes, 
> beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not 
> be limited by the value of {color:#ff}isolationLevel {color}in the 
> ListOffsetsOptions parameter.*
> In the current KafkaAdminClient.listOffsets() method, both the AdminClient 
> and the server consider isolationLevel as a required parameter:
> 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
> will be thrown when AdminClient executes listOffsets() method.
> {code:java}
> ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}
> 2) The current logic for converting isolationLevel on the server side has not 
> yet handled the case where the user passes in a value that is neither 
> READ_UNCOMMITTED nor READ_COMMITTED :
> {code:java}
> val isolationLevelOpt = if (isClientRequest)
>   Some(offsetRequest.isolationLevel)
> else
>   None {code}
> {code:java}
> public IsolationLevel isolationLevel() {
> return IsolationLevel.forId(data.isolationLevel());
> } {code}
> h1.  
> h2. Suggestion:
> Added a new enum `NONE` in IsolationLevel, only dedicated to 
> AdminClient.listOffsets() method.
> This change may cause the highestSupportedVersion of 
> ApiMessageType.LIST_OFFSETS to increase by one.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] LeonSamuel opened a new pull request, #12115: Update README.md

2022-05-02 Thread GitBox


LeonSamuel opened a new pull request, #12115:
URL: https://github.com/apache/kafka/pull/12115

   As someone new to Kafka, it would have been welcoming to see a succinct 
high-level overview of what Kafka is as the first piece of documentation. As I 
clicked on the home page of Kafka website, I read about seemingly disconnected 
pieces of what Kafka could do but the project didn't succeed in taking on a 
tangible form or differentiating itself from similar scaling systems. This 
leaves it up to the user to either quit or continue to push into a third 
attempt of research - but now with more apprehension. I thought my additions 
(quoted from the Kafka website) would add some clarity and decrease the barrier 
for those who may want to quickly learn about the system.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [X] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12106: KAFKA-13861: Fix the validateOnly behavior for CreatePartitions requests in KRaft mode

2022-05-02 Thread GitBox


hachikuji commented on code in PR #12106:
URL: https://github.com/apache/kafka/pull/12106#discussion_r863259182


##
core/src/test/scala/unit/kafka/server/ControllerApisTest.scala:
##
@@ -730,8 +729,45 @@ class ControllerApisTest {
 request.topics().add(new 
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
 request.topics().add(new 
CreatePartitionsTopic().setName("baz").setAssignments(null).setCount(5))
 assertEquals(Set(new CreatePartitionsTopicResult().setName("foo").
-setErrorCode(NONE.code()).
-setErrorMessage(null),
+  setErrorCode(NONE.code()).
+  setErrorMessage(null),
+  new CreatePartitionsTopicResult().setName("bar").
+setErrorCode(INVALID_REQUEST.code()).
+setErrorMessage("Duplicate topic name."),
+  new CreatePartitionsTopicResult().setName("baz").
+setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
+setErrorMessage(null)),
+  controllerApis.createPartitions(ANONYMOUS_CONTEXT, request,
+_ => Set("foo", "bar")).get().asScala.toSet)
+  }
+
+  @Test
+  def testValidateOnlyCreatePartitionsRequest(): Unit = {

Review Comment:
   Is this test basically the same as `testCreatePartitionsRequest`? Maybe we 
can get rid of one and turn the other into a `@ParameterizedTest` with 
`validateOnly` as the parameter?



##
metadata/src/main/java/org/apache/kafka/controller/Controller.java:
##
@@ -328,11 +328,15 @@ CompletableFuture 
updateFeatures(
  * Create partitions on certain topics.
  *
  * @param topicsThe list of topics to create partitions for.
+ * @param validateOnly  If true, create partitions is just validated and 
returns response

Review Comment:
   nit: how about this?
   > If true, the request is validated, but no partitions will be created.



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1364,7 +1364,8 @@ Boolean isBrokerUnfenced(int brokerId) {
 setErrorCode(apiError.error().code()).
 setErrorMessage(apiError.message()));
 }
-return new ControllerResult<>(records, results, true);
+log.debug("CreatePartitions result(s): {}", results);

Review Comment:
   Hmm.. It is useful to know in the logs if `validateOnly` was set. 



##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -499,23 +504,28 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   e = assertThrows(classOf[ExecutionException], () => 
alterResult.values.get(topic2).get,
 () => s"$desc: Expect InvalidPartitionsException when requesting a 
noop")
   assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc)
-  assertEquals("Topic already has 3 partitions.", e.getCause.getMessage, 
desc)
+  exceptionMsgStr = if (TestInfoUtils.isKRaft(testInfo)) {
+"Topic already has 3 partition(s)."
+  } else {
+"Topic already has 3 partitions."

Review Comment:
   I wonder if we can unify the error messages? The differences do not seem 
interesting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13867) Improve JavaDoc for MetadataVersion.java

2022-05-02 Thread Alyssa Huang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530971#comment-17530971
 ] 

Alyssa Huang commented on KAFKA-13867:
--

^ just confirming that you mean `MetadataVersion#ibpVersion`?

> Improve JavaDoc for MetadataVersion.java
> 
>
> Key: KAFKA-13867
> URL: https://issues.apache.org/jira/browse/KAFKA-13867
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] cmccabe commented on pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-05-02 Thread GitBox


cmccabe commented on PR #12072:
URL: https://github.com/apache/kafka/pull/12072#issuecomment-1115466993

   Thanks for this PR, @ahuang98. And thanks to everyone who reviewed. I filed 
https://issues.apache.org/jira/browse/KAFKA-13867 for two very minor issues 
that we discussed here (one javadoc improvement issue, one small field name 
issue).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13867) Improve JavaDoc for MetadataVersion.java

2022-05-02 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530969#comment-17530969
 ] 

Colin McCabe commented on KAFKA-13867:
--

I also suggest renaming `MetadataVersion#version` to `MetadataVersion#fullName`

> Improve JavaDoc for MetadataVersion.java
> 
>
> Key: KAFKA-13867
> URL: https://issues.apache.org/jira/browse/KAFKA-13867
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13867) Improve JavaDoc for MetadataVersion.java

2022-05-02 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-13867:


 Summary: Improve JavaDoc for MetadataVersion.java
 Key: KAFKA-13867
 URL: https://issues.apache.org/jira/browse/KAFKA-13867
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] cmccabe commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-05-02 Thread GitBox


cmccabe commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r863258243


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+/**
+ * This class contains the different Kafka versions.
+ * Right now, we use them for upgrades - users can configure the version of 
the API brokers will use to communicate between themselves.
+ * This is only for inter-broker communications - when communicating with 
clients, the client decides on the API version.
+ *
+ * Note that the ID we initialize for each version is important.
+ * We consider a version newer than another if it is lower in the enum list 
(to avoid depending on lexicographic order)
+ *
+ * Since the api protocol may change more than once within the same release 
and to facilitate people deploying code from
+ * trunk, we have the concept of internal versions (first introduced during 
the 0.10.0 development cycle). For example,
+ * the first time we introduce a version change in a release, say 0.10.0, we 
will add a config value "0.10.0-IV0" and a
+ * corresponding enum constant IBP_0_10_0-IV0. We will also add a config value 
"0.10.0" that will be mapped to the
+ * latest internal version object, which is IBP_0_10_0-IV0. When we change the 
protocol a second time while developing
+ * 0.10.0, we will add a new config value "0.10.0-IV1" and a corresponding 
enum constant IBP_0_10_0-IV1. We will change
+ * the config value "0.10.0" to map to the latest internal version 
IBP_0_10_0-IV1. The config value of
+ * "0.10.0-IV0" is still mapped to IBP_0_10_0-IV0. This way, if people are 
deploying from trunk, they can use
+ * "0.10.0-IV0" and "0.10.0-IV1" to upgrade one internal version at a time. 
For most people who just want to use
+ * released version, they can use "0.10.0" when upgrading to the 0.10.0 
release.
+ */
+public enum MetadataVersion {
+IBP_0_8_0(-1),
+IBP_0_8_1(-1),
+IBP_0_8_2(-1),
+IBP_0_9_0(-1),
+
+// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+IBP_0_10_0_IV0(-1),
+
+// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+IBP_0_10_0_IV1(-1),
+
+// introduced for JoinGroup protocol change in KIP-62
+IBP_0_10_1_IV0(-1),
+
+// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+IBP_0_10_1_IV1(-1),
+
+// introduced ListOffsetRequest v1 in KIP-79
+IBP_0_10_1_IV2(-1),
+
+// introduced UpdateMetadataRequest v3 in KIP-103
+IBP_0_10_2_IV0(-1),
+
+// KIP-98 (idempotent and transactional producer support)
+IBP_0_11_0_IV0(-1),
+
+// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+IBP_0_11_0_IV1(-1),
+
+// Introduced leader epoch fetches to the replica fetcher via KIP-101
+IBP_0_11_0_IV2(-1),
+
+// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+IBP_1_0_IV0(-1),
+
+// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+// and KafkaStorageException for fetch requests.
+IBP_1_1_IV0(-1),
+
+// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+IBP_2_0_IV0(-1),
+
+// Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+IBP_2_0_IV1(-1),
+
+// Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+IBP_2_1_IV0(-1),
+
+// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+IBP_2_1_IV1(-1),
+
+// Support ZStandard Compression Codec (KIP-110)
+IBP_2_1_IV2(-1),
+
+// Introduced broker generation (KIP-380), and
+// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+IBP_2_2_IV0(-1),
+
+// New 

[GitHub] [kafka] cmccabe merged pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-05-02 Thread GitBox


cmccabe merged PR #12072:
URL: https://github.com/apache/kafka/pull/12072


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-05-02 Thread GitBox


cmccabe commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r863255928


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.ConfigDef.Validator;
+import org.apache.kafka.common.config.ConfigException;
+
+public class MetadataVersionValidator implements Validator {
+
+@Override
+public void ensureValid(String name, Object value) {
+try {
+MetadataVersion.fromVersionString(value.toString());
+} catch (IllegalArgumentException e) {
+throw new ConfigException(name, value.toString(), e.getMessage());
+}
+}
+
+@Override
+public String toString() {

Review Comment:
   > I'm not sure if I follow the discussion on the toString location, are we 
saying that it should be moved?
   
   I think the proposal was to move `MetadataVersionValidator` to a (static?) 
inner class of `MetadataVersion`. 
   
   In general it's nice to keep the source files small, though, I think, so I'd 
recommend leaving as-is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akhileshchg commented on a diff in pull request #12106: KAFKA-13861: Fix the validateOnly behavior for CreatePartitions requests in KRaft mode

2022-05-02 Thread GitBox


akhileshchg commented on code in PR #12106:
URL: https://github.com/apache/kafka/pull/12106#discussion_r863179905


##
core/src/test/java/kafka/test/MockController.java:
##
@@ -410,10 +410,16 @@ public CompletableFuture 
updateFeatures(
 throw new UnsupportedOperationException();
 }
 
+boolean lastCreatePartitionsValidateOnly = false;

Review Comment:
   Added a new unit test and remove `lastCreatePartitionsValidateOnly`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530937#comment-17530937
 ] 

François Rosière commented on KAFKA-13864:
--

[~cadonna], table updated and discussion started.

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
> Attachments: interceptor_constructor_client.patch
>
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530937#comment-17530937
 ] 

François Rosière edited comment on KAFKA-13864 at 5/2/22 8:44 PM:
--

[~cadonna], table updated and discussion started. Thanks for the infos.


was (Author: JIRAUSER288866):
[~cadonna], table updated and discussion started.

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
> Attachments: interceptor_constructor_client.patch
>
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] akhileshchg commented on a diff in pull request #12106: KAFKA-13861: Fix the validateOnly behavior for CreatePartitions requests in KRaft mode

2022-05-02 Thread GitBox


akhileshchg commented on code in PR #12106:
URL: https://github.com/apache/kafka/pull/12106#discussion_r863095080


##
core/src/test/java/kafka/test/MockController.java:
##
@@ -410,10 +410,16 @@ public CompletableFuture 
updateFeatures(
 throw new UnsupportedOperationException();
 }
 
+boolean lastCreatePartitionsValidateOnly = false;

Review Comment:
   The `MockController` doesn't maintain any state of the topics/partitions 
that are created and deleted. So, I thought this was a more straightforward fix 
to check if the `validateOnly` flag is used. Please let met know if you have 
some other ideas.



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1364,7 +1364,25 @@ Boolean isBrokerUnfenced(int brokerId) {
 setErrorCode(apiError.error().code()).
 setErrorMessage(apiError.message()));
 }
-return new ControllerResult<>(records, results, true);
+StringBuilder resultsBuilder = new StringBuilder();

Review Comment:
   Sure. That seems reasonable. I did this to match the `createTopics` log 
statement.



##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -767,7 +767,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   setErrorCode(TOPIC_AUTHORIZATION_FAILED.code))
   }
 }
-controller.createPartitions(context, topics).thenApply { results =>
+controller.createPartitions(context, topics, 
request.validateOnly()).thenApply { results =>

Review Comment:
   Done.



##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -767,7 +767,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   setErrorCode(TOPIC_AUTHORIZATION_FAILED.code))
   }
 }
-controller.createPartitions(context, topics).thenApply { results =>
+controller.createPartitions(context, topics, 
request.validateOnly()).thenApply { results =>

Review Comment:
   I enabled the test for both KRaft and Zk modes now. The existing test is 
robust and covers all the cases.



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1664,13 +1664,14 @@ public CompletableFuture 
updateFeatures(
 @Override
 public CompletableFuture> 
createPartitions(

Review Comment:
   That's a good idea. Will do this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13636) Committed offsets could be deleted during a rebalance if a group did not commit for a while

2022-05-02 Thread Andrew Olson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-13636:
-
Description: 
The group coordinator might delete invalid offsets during a group rebalance. 
During a rebalance, the coordinator is relying on the last commit timestamp 
({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
modification {_}timestamp (currentStateTimestamp{_}) to detect expired offsets.

 

This is relatively easy to reproduce by playing with 
group.initial.rebalance.delay.ms, offset.retention.minutes and 
offset.check.retention.interval, I uploaded an example on: 
[https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] .

This script does:
 * Start a broker with: offset.retention.minute=2, 
o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000,
  group.initial.rebalance.delay=2
 * Produced 10 messages
 * Create a consumer group to consume 10 messages, and disable auto.commit to 
only commit a few times
 * Wait 3 minutes, then the Consumer get a {{kill -9}}
 * Restart the consumer after a few seconds
 * The consumer restart from {{auto.offset.reset}} , the offset got removed

 

The cause is due to the GroupMetadata.scala:
 * When the group get emptied, the {{subscribedTopics}} is set to {{Set.empty}} 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521])
 * When the new member joins, we add the new member right away in the group ; 
BUT the {{subscribedTopics}} is only updated once the migration is over (in the 
initNewGeneration) (which could take a while due to the 
{{{}group.initial.rebalance.delay{}}})
 * When the log cleaner got executed,  {{subscribedTopics.isDefined}} returns 
true as {{Set.empty != None}} (the underlying condition)
 * Thus we enter 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785]
 with an empty {{subscribedTopics}} list and we are relying on the 
{{commitTimestamp}} regardless of the {{currentStateTimestamp}}

 

This seem to be a regression generated by KIP-496 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges
 (KAFKA-8338, KAFKA-8370)

  was:
The group coordinator might delete invalid offsets during a group rebalance. 
During a rebalance, the coordinator is relying on the last commit timestamp 
({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
modification {_}timestampt (currentStateTimestamp{_}) to detect expired offsets.

 

This is relatively easy to reproduce by playing with 
group.initial.rebalance.delay.ms, offset.retention.minutes and 
offset.check.retention.interval, I uploaded an example on: 
[https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] .

This script does:
 * Start a broker with: offset.retention.minute=2, 
o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000,
  group.initial.rebalance.delay=2
 * Produced 10 messages
 * Create a consumer group to consume 10 messages, and disable auto.commit to 
only commit a few times
 * Wait 3 minutes, then the Consumer get a {{kill -9}}
 * Restart the consumer after a few seconds
 * The consumer restart from {{auto.offset.reset}} , the offset got removed

 

The cause is due to the GroupMetadata.scala:
 * When the group get emptied, the {{subscribedTopics}} is set to {{Set.empty}} 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521])
 * When the new member joins, we add the new member right away in the group ; 
BUT the {{subscribedTopics}} is only updated once the migration is over (in the 
initNewGeneration) (which could take a while due to the 
{{{}group.initial.rebalance.delay{}}})
 * When the log cleaner got executed,  {{subscribedTopics.isDefined}} returns 
true as {{Set.empty != None}} (the underlying condition)
 * Thus we enter 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785]
 with an empty {{subscribedTopics}} list and we are relying on the 
{{commitTimestamp}} regardless of the {{currentStateTimestamp}}

 

This seem to be a regression generated by KIP-496 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges


> Committed offsets could be deleted during a rebalance if a group did not 
> commit for a while
> ---
>
> Key: KAFKA-13636
> URL: https://issues.apache.org/jira/browse/KAFKA-13636
> Project: Kafka
>  

[GitHub] [kafka] cadonna commented on a diff in pull request #12114: MINOR: Note that slf4j-log4j in version 1.7.35+ should be used

2022-05-02 Thread GitBox


cadonna commented on code in PR #12114:
URL: https://github.com/apache/kafka/pull/12114#discussion_r863082871


##
docs/upgrade.html:
##
@@ -73,7 +73,11 @@ Notable changes in 3
 via Connect worker and/or connector configuration. Connect may 
enable idempotent producers
 by default in a future major release.
 Kafka has replaced log4j and slf4j-log4j12 with reload4j and 
slf4j-reload4j due to security concerns.

Review Comment:
   @ijuma changed the text as you proposed.
   
   I will cherry-pick the commit to 3.2 and 3.1.
   
   @tombentley Since this is just a doc change, we do not need a new RC, right? 
We can port the change directly to the doc repo in case RC1 passes the votes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13866) Support more advanced time retention policies

2022-05-02 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-13866:
---

 Summary: Support more advanced time retention policies
 Key: KAFKA-13866
 URL: https://issues.apache.org/jira/browse/KAFKA-13866
 Project: Kafka
  Issue Type: Improvement
  Components: config, core, log cleaner
Reporter: Matthias J. Sax


Time-based retention policy compares the record timestamp to broker wall-clock 
time. Those semantics are questionable and also lead to issues for data 
reprocessing: If one want to re-process older data then retention time, it's 
not possible as broker expire those record aggressively and user need to 
increate the retention time accordingly.

Especially for Kafka Stream, we have seen many cases when users got bit by the 
current behavior.

It would be best, if Kafka would track _two_ timestamps per record: the record 
event-time (as the broker do currently), plus the log append-time (which is 
only tracked currently if the topic is configured with "append-time" tracking, 
but the issue is, that it overwrite the producer provided record event-time).

Tracking both timestamps would allow to set a pure wall-clock time retention 
time plus a pure event-time retention time policy:
 * Wall-clock time: keep (at least) the date X days after writing
 * Event-time: keep (at max) the X days worth of event-time data

Comparing wall-clock time to wall-clock time and event-time to event-time 
provides much cleaner semantics. The idea is to combine both policies and only 
expire data if both policies trigger.

For the event-time policy, the broker would need to track "stream time" as max 
event-timestamp it has see per partition (similar to how Kafka Streams is 
tracking "stream time" client side).

Note the difference between "at least" and "at max" above: for the 
data-reprocessing case, the max-based event-time policy avoids that the broker 
would keep a huge history for the reprocessing case.

It would be part of a KIP discussion on the details how wall-clock/event-time 
and mix/max policies could be combined. For example, it might also be useful to 
have the following policy: keep at least X days worth of event-time history no 
matter how long the data is already stored (ie, there would only be an 
event-time base expiration but not wall-clock time). It could also be combined 
with a wall-clock time expiration: delete data only after it's at least X days 
old and stored for at least Y days.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] ahuang98 commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-05-02 Thread GitBox


ahuang98 commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r863014438


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.ConfigDef.Validator;
+import org.apache.kafka.common.config.ConfigException;
+
+public class MetadataVersionValidator implements Validator {
+
+@Override
+public void ensureValid(String name, Object value) {
+try {
+MetadataVersion.fromVersionString(value.toString());
+} catch (IllegalArgumentException e) {
+throw new ConfigException(name, value.toString(), e.getMessage());
+}
+}
+
+@Override
+public String toString() {

Review Comment:
   I'll remove the `distinct` call. 
   I'm not sure if I follow the discussion on the `toString` location, are we 
saying that it should be moved?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

2022-05-02 Thread GitBox


mimaison commented on PR #11773:
URL: https://github.com/apache/kafka/pull/11773#issuecomment-1115108546

   Sorry @C0urante for the delays, we were at Kafka Summit last week and I'm 
still trying to catch up on stuff. I'm hoping to take another look this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #12067: KAFKA-13780: Generate OpenAPI file for Connect REST API

2022-05-02 Thread GitBox


mimaison commented on PR #12067:
URL: https://github.com/apache/kafka/pull/12067#issuecomment-1115103573

   @kkonstantine @rhauch Can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #12107: MINOR: fix ClientQuotasRequestTest.testAlterClientQuotasBadIp

2022-05-02 Thread GitBox


cmccabe merged PR #12107:
URL: https://github.com/apache/kafka/pull/12107


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11874: Fix typos in configuration docs

2022-05-02 Thread GitBox


C0urante commented on code in PR #11874:
URL: https://github.com/apache/kafka/pull/11874#discussion_r862970744


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -216,8 +216,10 @@
 private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = 
"The maximum number of unacknowledged requests the client will send on a single 
connection before blocking."
 + 
" Note that if this config is set to be greater than 1 and 
enable.idempotence is set to false, there is a risk of"
 + 
" message re-ordering after a failed send due to retries (i.e., if retries are 
enabled)."
-+ 
" Additionally, enabling idempotence requires this config value to be less than 
or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
-+ 
" If conflicting configurations are set and idempotence is not explicitly 
enabled, idempotence is disabled.";
++ 
" Additionally, enabling idempotence requires the value of this configuration 
to be less than or equal to " + 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
++ 
" If conflicting configurations are set and idempotence is not explicitly 
enabled, idempotence is disabled. "
++ 
" Record ordering is preserved when enable.idempotence is set to 
true for idempotent "
++ 
" producer (or transactional producer), even when max in-flight requests are 
greater than 1 (supported up to 5).";

Review Comment:
   The last sentence is redundant though, isn't it? The docs already state:
   
   > Note that if this config is set to be greater than 1 and 
enable.idempotence is set to false, there is a risk of message 
re-ordering after a failed send due to retries (i.e., if retries are enabled).
   
   It's fine if we want to clarify that re-ordering is not a risk when 
`enable.idempotence` is set to true, but we should also try to keep the docs 
here concise.
   
   What about adding a brief followup in that sentence instead?
   
   > Note that if this config is set to be greater than 1 and 
enable.idempotence is set to false, there is a risk of message 
re-ordering after a failed send due to retries (i.e., if retries are enabled); 
ordering will be preserved if retries are disabled and/or 
enable.idempotence is true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13859) SCRAM authentication issues with kafka-clients 3.0.1

2022-05-02 Thread Oliver Payne (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530796#comment-17530796
 ] 

Oliver Payne commented on KAFKA-13859:
--

Sorry for the late response. I see that this has already been marked resolved, 
but wanted to answer the questions I left hanging here.
[~dengziming] The following exception is coming from the client logs:

 
{code:java}
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested 
exception is org.apache.kafka.common.errors.ClusterAuthorizationException: 
Cluster authorization failed.
    at 
org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$6(KafkaTemplate.java:690)
 ~[spring-kafka-2.8.5.jar:2.8.5]
    at 
org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer$1.onCompletion(DefaultKafkaProducerFactory.java:1001)
 ~[spring-kafka-2.8.5.jar:2.8.5]
    at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1350)
 ~[kafka-clients-3.0.1.jar:na]
    at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273)
 ~[kafka-clients-3.0.1.jar:na]
    at 
org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:161)
 ~[kafka-clients-3.0.1.jar:na]
    at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:773)
 ~[kafka-clients-3.0.1.jar:na]
    at 
org.apache.kafka.clients.producer.internals.Sender.maybeAbortBatches(Sender.java:498)
 ~[kafka-clients-3.0.1.jar:na]
    at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:307) 
~[kafka-clients-3.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) 
~[kafka-clients-3.0.1.jar:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: 
Cluster authorization failed. {code}
Our broker version is 2.6.2

 

Here are our producer configs:
{code:java}
"security.protocol" -> "SASL_SSL""value.serializer" -> 
"org.apache.kafka.common.serialization.StringSerializer""sasl.mechanism" -> 
"SCRAM-SHA-512""sasl.jaas.config" -> 
"org.apache.kafka.common.security.scram.ScramLoginModule required 
username="redacted" password="redacted";""bootstrap.servers" -> 
"server-name-redacted1:9096, server-name-redacted2:9096, 
server-name-redacted3:9096""key.serializer" -> 
"org.apache.kafka.common.serialization.StringSerializer""ssl.endpoint.identification.algorithm"
 -> "https" {code}
 

 

I also added the enable.idempotence = false per your recommendation, and it 
seemed to resolve the issue. Thanks for the suggestion

 

> SCRAM authentication issues with kafka-clients 3.0.1
> 
>
> Key: KAFKA-13859
> URL: https://issues.apache.org/jira/browse/KAFKA-13859
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.1
>Reporter: Oliver Payne
>Assignee: dengziming
>Priority: Major
>
> When attempting to produce records to Kafka using a client configured with 
> SCRAM authentication, the authentication is being rejected, and the following 
> exception is thrown:
> {{org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster 
> authorization failed.}}
> I am seeing this happen with a Springboot service that was recently upgraded 
> to 2.6.5. After looking into this, I learned that Springboot moved to 
> kafka-clients 3.0.1 from 3.0.0 in that version. And sure enough, downgrading 
> to kafka-clients resolved the issue, with no changes made to the configs.
> I have also attempted to connect to a separate server with kafka-clients 
> 3.0.1, using plaintext authentication. That works fine. So the issue appears 
> to be with SCRAM authentication.
> I will note that I am attempting to connect to an AWS MSK instance. We use 
> SCRAM-SHA-512 as our sasl mechanism, using the basic {{ScramLoginModule.}} 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] ijuma commented on a diff in pull request #12114: MINOR: Note that slf4j-log4j in version 1.7.35+ should be used

2022-05-02 Thread GitBox


ijuma commented on code in PR #12114:
URL: https://github.com/apache/kafka/pull/12114#discussion_r862968313


##
docs/upgrade.html:
##
@@ -73,7 +73,11 @@ Notable changes in 3
 via Connect worker and/or connector configuration. Connect may 
enable idempotent producers
 by default in a future major release.
 Kafka has replaced log4j and slf4j-log4j12 with reload4j and 
slf4j-reload4j due to security concerns.

Review Comment:
   We should also update the same text in the 3.1 branch once we merge this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12114: MINOR: Note that slf4j-log4j in version 1.7.35+ should be used

2022-05-02 Thread GitBox


ijuma commented on code in PR #12114:
URL: https://github.com/apache/kafka/pull/12114#discussion_r862967880


##
docs/upgrade.html:
##
@@ -73,7 +73,11 @@ Notable changes in 3
 via Connect worker and/or connector configuration. Connect may 
enable idempotent producers
 by default in a future major release.
 Kafka has replaced log4j and slf4j-log4j12 with reload4j and 
slf4j-reload4j due to security concerns.

Review Comment:
   Maybe we can say something like:
   
   > Kafka has replaced log4j and slf4j-log4j12 with reload4j and 
slf4j-reload4j due to security concerns.
   >  This only affects modules that specify a logging backend 
(`connect-runtime` and `kafka-tools` are two such
   >  examples). A number of modules, including `kafka-clients`, 
leave it to the application to specify the logging
   >  backend. More information can be found at https://reload4j.qos.ch;>reload4j.
   > Projects that depend on the affected modules from the Kafka 
project should use
   > https://www.slf4j.org/manual.html#swapping;>slf4j-log4j12 version 1.7.35 
or above or
   > slf4j-reload4j to avoid
   > https://www.slf4j.org/codes.html#no_tlm;>possible 
compatibility issues originating from the logging framework.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

2022-05-02 Thread GitBox


C0urante commented on PR #11773:
URL: https://github.com/apache/kafka/pull/11773#issuecomment-1115054516

   @tombentley @mimaison I'd really like it if we could confirm the intended 
direction for this API. I'm willing to go whichever direction you believe is 
best, but (as Tom has noted) given that this is fairly green-field for Connect, 
I want to make sure that we consider our options carefully and set a good 
precedent for future APIs like this. If you believe we've done our due 
diligence, I'm happy to implement whatever approach you believe is best; please 
just confirm what you would like to see here if there are still any 
reservations about the current state of the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530788#comment-17530788
 ] 

Bruno Cadonna commented on KAFKA-13864:
---

[~frosiere] When you write a KIP you need to follow the process described in 
the [KIP 
doc|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]
 under section "Process". Otherwise we will lose track of the KIP and nobody 
will ever read your KIP and approve it. I incremented the "Next KIP Number" for 
you. Now you need to add your KIP at the end of the table in section "KIPs 
under discussion". Then you need to start a [DISCUSSION] thread on the dev 
mailing list.

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
> Attachments: interceptor_constructor_client.patch
>
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] cadonna commented on pull request #12114: MINOR: Note that slf4j-log4j in version 1.7.35+ should be used

2022-05-02 Thread GitBox


cadonna commented on PR #12114:
URL: https://github.com/apache/kafka/pull/12114#issuecomment-1115025366

   @ijuma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #12114: MINOR: Note that slf4j-log4j in version 1.7.35+ should be used

2022-05-02 Thread GitBox


cadonna opened a new pull request, #12114:
URL: https://github.com/apache/kafka/pull/12114

   Adds a note to the upgrade notes to use slf4j-log4j version
   1.7.35+ [1] or slf4j-reload4j to avoid possible compatibility issues
   originating from the logging framework [2].
   
   [1] https://www.slf4j.org/manual.html#swapping
   [2] https://www.slf4j.org/codes.html#no_tlm
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing

2022-05-02 Thread GitBox


YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862388033


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##
@@ -110,14 +114,18 @@ public Map performAssignment(String 
leaderId, String protoco
 : CONNECT_PROTOCOL_V1;
 
 Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator);
+Map assignments;
 if (leaderOffset == null) {
-Map assignments = fillAssignments(
+assignments = fillAssignments(
 memberConfigs.keySet(), Assignment.CONFIG_MISMATCH,
-leaderId, memberConfigs.get(leaderId).url(), maxOffset, 
Collections.emptyMap(),
-Collections.emptyMap(), Collections.emptyMap(), 0, 
protocolVersion);
-return serializeAssignments(assignments);
+leaderId, memberConfigs.get(leaderId).url(), maxOffset,
+ClusterAssignment.EMPTY, 0, protocolVersion);
+} else {
+assignments = performTaskAssignment(leaderId, leaderOffset, 
memberConfigs, coordinator, protocolVersion);
 }
-return performTaskAssignment(leaderId, leaderOffset, memberConfigs, 
coordinator, protocolVersion);
+Map result = serializeAssignments(assignments);
+log.debug("Finished assignment");

Review Comment:
   @C0urante
   
   This works with `Map assignment's'`.
   So maybe this?
   
   ```suggestion
   log.debug("Finished assignments");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-05-02 Thread GitBox


divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r862915029


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -34,22 +34,38 @@
  */
 public abstract class SampledStat implements MeasurableStat {
 
-private double initialValue;
+private final double initialValue;
 private int current = 0;
+
 protected List samples;
 
 public SampledStat(double initialValue) {
 this.initialValue = initialValue;
 this.samples = new ArrayList<>(2);
 }
 
+/**
+ * {@inheritDoc}
+ *
+ * On every record, do the following:
+ * 1. Check if the current window has expired
+ * 2. If yes, then advance the current pointer to new window. The start 
time of the new window is set to nearest
+ *possible starting point for the new window. The nearest starting 
point occurs at config.timeWindowMs intervals
+ *from the end time of last known window.
+ * 3. Update the recorded value for the current window
+ * 4. Increase the number of event count
+ */
 @Override
-public void record(MetricConfig config, double value, long timeMs) {
-Sample sample = current(timeMs);
-if (sample.isComplete(timeMs, config))
-sample = advance(config, timeMs);
-update(sample, config, value, timeMs);
-sample.eventCount += 1;
+public void record(MetricConfig config, double value, long 
recordingTimeMs) {
+Sample sample = current(recordingTimeMs);
+if (sample.isComplete(recordingTimeMs, config)) {
+final long previousWindowStartTime = sample.getLastWindowMs();
+sample = advance(config, recordingTimeMs);

Review Comment:
   Modified the code to make it more readable. It is not exactly what you 
mentioned but should be more readable now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-05-02 Thread GitBox


divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r862909115


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -138,6 +170,46 @@ public boolean isComplete(long timeMs, MetricConfig 
config) {
 return timeMs - lastWindowMs >= config.timeWindowMs() || 
eventCount >= config.eventWindow();
 }
 
+public boolean isActive() {

Review Comment:
   All changes to public API have been reverted. Addressing the core fix does 
not require any changes to the public APIs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-05-02 Thread GitBox


divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r862908720


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java:
##
@@ -52,10 +51,6 @@ public Rate(TimeUnit unit, SampledStat stat) {
 this.unit = unit;
 }
 
-public String unitName() {

Review Comment:
   All changes to public API have been reverted. Addressing the core fix does 
not require any changes to the public APIs.



##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -110,25 +127,40 @@ public String toString() {
 protected void purgeObsoleteSamples(MetricConfig config, long now) {
 long expireAge = config.samples() * config.timeWindowMs();
 for (Sample sample : samples) {
-if (now - sample.lastWindowMs >= expireAge)
+if (now - sample.getLastWindowMs() >= expireAge)
 sample.reset(now);
 }
 }
 
 protected static class Sample {
-public double initialValue;
-public long eventCount;
-public long lastWindowMs;
-public double value;
+private double initialValue;

Review Comment:
   All changes to public API have been reverted. Addressing the core fix does 
not require any changes to the public APIs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-05-02 Thread GitBox


divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r862908241


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -110,25 +127,40 @@ public String toString() {
 protected void purgeObsoleteSamples(MetricConfig config, long now) {
 long expireAge = config.samples() * config.timeWindowMs();
 for (Sample sample : samples) {
-if (now - sample.lastWindowMs >= expireAge)
+if (now - sample.getLastWindowMs() >= expireAge)
 sample.reset(now);
 }
 }
 
 protected static class Sample {
-public double initialValue;
-public long eventCount;
-public long lastWindowMs;
-public double value;
+private double initialValue;
+private long eventCount;
+private long lastWindowMs;
+private double value;
+
+/**
+ * A Sample object could be re-used in a ring buffer to store future 
samples for space efficiency.
+ * Thus, a sample could be in either of the following lifecycle states:
+ * NOT_INITIALIZED: Sample has not been initialized.
+ * ACTIVE: Sample has values and is currently
+ * RESET: Sample has been reset and the object is not destroyed so 
that it could be used for storing future
+ *samples.
+ */
+private enum LifecycleState {

Review Comment:
   This code has been removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-05-02 Thread GitBox


divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r862908032


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -87,8 +103,9 @@ public Sample oldest(long now) {
 Sample oldest = this.samples.get(0);
 for (int i = 1; i < this.samples.size(); i++) {
 Sample curr = this.samples.get(i);
-if (curr.lastWindowMs < oldest.lastWindowMs)
+if ((curr.getLastWindowMs() < oldest.getLastWindowMs()) && 
curr.isActive()) { // only consider active samples

Review Comment:
   I have completely got rid of these changes. They were more from useful for 
defensive programming but changing the public APIs would have required a KIP 
and also complicated this code review. I will file a separate PR to add these 
defensive checks back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-05-02 Thread GitBox


divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r862906685


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java:
##
@@ -68,28 +63,61 @@ public double measure(MetricConfig config, long now) {
 }
 
 public long windowSize(MetricConfig config, long now) {
-// purge old samples before we compute the window size
+// Purge obsolete samples. Obsolete samples are the ones which are not 
relevant to the current calculation
+// because their creation time is outside (before) the duration of 
time window used to calculate rate.
 stat.purgeObsoleteSamples(config, now);
 
 /*
  * Here we check the total amount of time elapsed since the oldest 
non-obsolete window.
- * This give the total windowSize of the batch which is the time used 
for Rate computation.
- * However, there is an issue if we do not have sufficient data for 
e.g. if only 1 second has elapsed in a 30 second
- * window, the measured rate will be very high.
- * Hence we assume that the elapsed time is always N-1 complete 
windows plus whatever fraction of the final window is complete.
+ * This gives the duration of computation time window which used to 
calculate Rate.
+ *
+ * For scenarios when rate computation is performed after at least 
`config.samples` have completed,
+ * the duration of computation time window is determined by:
+ *  window duration = (now - start time of oldest non-obsolete 
window)
+ *
+ * ## Special case: First ever window
+ * A special scenario occurs when rate calculation is performed before 
at least `config.samples` have completed
+ * (e.g. if only 1 second has elapsed in a 30 second). In such a 
scenario, window duration would be equal to the
+ * time elapsed in the current window (since oldest non-obsolete 
window is current window). This leads to the
+ * following values for rate. Consider the following example:
+ *  config.timeWindowMs() = 1s
+ *  config.samples() = 2
+ *  Record events (E) at timestamps:
+ *  E1 = CurrentTimeStamp (T1)
+ *  E2 = T1 + 30ms
+ *  E2 = T1 + 60ms

Review Comment:
   Fixed. Thanks for point thing this out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-05-02 Thread GitBox


divijvaidya commented on PR #12045:
URL: https://github.com/apache/kafka/pull/12045#issuecomment-1114895474

   @mimaison Thinking about it, I can actually reduce the code changes such 
that no modifications to any public interface is made. Do you still think a KIP 
is required for this change in that case? (I am new to Kafka so I am not fully 
sure what qualifies for a KIP vs. what doesn't)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chromy96 commented on a diff in pull request #11916: KAFKA-12703; Allow unencrypted private keys when using PEM files

2022-05-02 Thread GitBox


chromy96 commented on code in PR #11916:
URL: https://github.com/apache/kafka/pull/11916#discussion_r862846428


##
clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java:
##
@@ -291,7 +289,14 @@ public void testPemKeyStoreFileNoKeyPassword() throws 
Exception {
 configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
 pemFilePath(pemAsConfigValue(KEY, CERTCHAIN).value()));
 configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
DefaultSslEngineFactory.PEM_TYPE);
-assertThrows(InvalidConfigurationException.class, () -> 
factory.configure(configs));
+configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, null);
+factory.configure(configs);
+
+KeyStore keyStore = factory.keystore();
+List aliases = Collections.list(keyStore.aliases());
+assertEquals(Collections.singletonList("kafka"), aliases);
+assertNotNull(keyStore.getCertificate("kafka"), "Certificate not 
loaded");
+assertNotNull(keyStore.getKey("kafka",  null), "Private key not 
loaded");

Review Comment:
   @dajac I found in two places where the mandatory key is mentioned. I've 
updated this in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chromy96 commented on a diff in pull request #11916: KAFKA-12703; Allow unencrypted private keys when using PEM files

2022-05-02 Thread GitBox


chromy96 commented on code in PR #11916:
URL: https://github.com/apache/kafka/pull/11916#discussion_r862845752


##
clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java:
##
@@ -291,7 +289,14 @@ public void testPemKeyStoreFileNoKeyPassword() throws 
Exception {
 configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
 pemFilePath(pemAsConfigValue(KEY, CERTCHAIN).value()));
 configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
DefaultSslEngineFactory.PEM_TYPE);
-assertThrows(InvalidConfigurationException.class, () -> 
factory.configure(configs));
+configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, null);
+factory.configure(configs);
+
+KeyStore keyStore = factory.keystore();
+List aliases = Collections.list(keyStore.aliases());
+assertEquals(Collections.singletonList("kafka"), aliases);
+assertNotNull(keyStore.getCertificate("kafka"), "Certificate not 
loaded");
+assertNotNull(keyStore.getKey("kafka",  null), "Private key not 
loaded");

Review Comment:
   Thanks for the comment. Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-05-02 Thread GitBox


divijvaidya commented on PR #12045:
URL: https://github.com/apache/kafka/pull/12045#issuecomment-1114879179

   Thanks for checking @mimaison. As I explained above, the test is flaky since 
the logic of computation of `Rate` has a bug and hence, in worst case scenario 
it can exceed whatever thresholds we set on the assertion. Increasing the test 
threshold will be a hacky way to fix it but that is equivalent to disabling the 
test altogether.
   
   Per your suggestion, let me start a discussion on the mailing list and 
create a KIP if we reach a consensus. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-05-02 Thread GitBox


mimaison commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r862834421


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -110,25 +127,40 @@ public String toString() {
 protected void purgeObsoleteSamples(MetricConfig config, long now) {
 long expireAge = config.samples() * config.timeWindowMs();
 for (Sample sample : samples) {
-if (now - sample.lastWindowMs >= expireAge)
+if (now - sample.getLastWindowMs() >= expireAge)
 sample.reset(now);
 }
 }
 
 protected static class Sample {
-public double initialValue;
-public long eventCount;
-public long lastWindowMs;
-public double value;
+private double initialValue;

Review Comment:
   This is also part of the public API, so we shouldn't be changing these fields



##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -138,6 +170,46 @@ public boolean isComplete(long timeMs, MetricConfig 
config) {
 return timeMs - lastWindowMs >= config.timeWindowMs() || 
eventCount >= config.eventWindow();
 }
 
+public boolean isActive() {

Review Comment:
   Again because it's public API we can't add new public methods without having 
a KIP



##
clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java:
##
@@ -52,10 +51,6 @@ public Rate(TimeUnit unit, SampledStat stat) {
 this.unit = unit;
 }
 
-public String unitName() {

Review Comment:
   `Rate` is part of the public API 
(https://kafka.apache.org/31/javadoc/org/apache/kafka/common/metrics/stats/Rate.html)
 so we don't want to remove this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-05-02 Thread GitBox


Hangleton commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r862786317


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -87,8 +103,9 @@ public Sample oldest(long now) {
 Sample oldest = this.samples.get(0);
 for (int i = 1; i < this.samples.size(); i++) {
 Sample curr = this.samples.get(i);
-if (curr.lastWindowMs < oldest.lastWindowMs)
+if ((curr.getLastWindowMs() < oldest.getLastWindowMs()) && 
curr.isActive()) { // only consider active samples

Review Comment:
   Is the `isActive` really required? Before the oldest sample is computed, 
expired samples are reset which brings the `lastWindonMs` equal to `now`. 
   
   Marginal note: this assumes at least one sample is active (that is if all 
samples between 1 and `samples.size()` are not active, the first sample in the 
list has to be active and would be the current sample).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530722#comment-17530722
 ] 

François Rosière edited comment on KAFKA-13864 at 5/2/22 12:52 PM:
---

[KIP-832|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882578]
 has been created.


was (Author: JIRAUSER288866):
KIP-832 has been created.

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
> Attachments: interceptor_constructor_client.patch
>
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530722#comment-17530722
 ] 

François Rosière commented on KAFKA-13864:
--

KIP-832 has been created.

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
> Attachments: interceptor_constructor_client.patch
>
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530716#comment-17530716
 ] 

Bruno Cadonna commented on KAFKA-13864:
---

[~frosiere] You should be all set now!

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
> Attachments: interceptor_constructor_client.patch
>
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530707#comment-17530707
 ] 

François Rosière commented on KAFKA-13864:
--

Make sense... frosiere is my account name on both Jira and Confluence. Thanks

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
> Attachments: interceptor_constructor_client.patch
>
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] Hangleton commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-05-02 Thread GitBox


Hangleton commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r862786317


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -87,8 +103,9 @@ public Sample oldest(long now) {
 Sample oldest = this.samples.get(0);
 for (int i = 1; i < this.samples.size(); i++) {
 Sample curr = this.samples.get(i);
-if (curr.lastWindowMs < oldest.lastWindowMs)
+if ((curr.getLastWindowMs() < oldest.getLastWindowMs()) && 
curr.isActive()) { // only consider active samples

Review Comment:
   Does the `isActive` really required? Before the oldest sample is computed, 
expired samples are reset which brings the `lastWindonMs` equal to `now`. 
   
   Marginal note: this assumes at least one sample is active (that is if all 
samples between 1 and `samples.size()` are not active, the first sample in the 
list has to be active and would be the current sample).



##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -110,25 +127,40 @@ public String toString() {
 protected void purgeObsoleteSamples(MetricConfig config, long now) {
 long expireAge = config.samples() * config.timeWindowMs();
 for (Sample sample : samples) {
-if (now - sample.lastWindowMs >= expireAge)
+if (now - sample.getLastWindowMs() >= expireAge)
 sample.reset(now);
 }
 }
 
 protected static class Sample {
-public double initialValue;
-public long eventCount;
-public long lastWindowMs;
-public double value;
+private double initialValue;
+private long eventCount;
+private long lastWindowMs;
+private double value;
+
+/**
+ * A Sample object could be re-used in a ring buffer to store future 
samples for space efficiency.
+ * Thus, a sample could be in either of the following lifecycle states:
+ * NOT_INITIALIZED: Sample has not been initialized.
+ * ACTIVE: Sample has values and is currently
+ * RESET: Sample has been reset and the object is not destroyed so 
that it could be used for storing future
+ *samples.
+ */
+private enum LifecycleState {

Review Comment:
   Cf. comment on calculation of the oldest sample - not sure this is required 
given `reset` update the `lastWindowMs` of the sample to the "current" 
timestamp at the time of the reset.



##
clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java:
##
@@ -68,28 +63,61 @@ public double measure(MetricConfig config, long now) {
 }
 
 public long windowSize(MetricConfig config, long now) {
-// purge old samples before we compute the window size
+// Purge obsolete samples. Obsolete samples are the ones which are not 
relevant to the current calculation
+// because their creation time is outside (before) the duration of 
time window used to calculate rate.
 stat.purgeObsoleteSamples(config, now);
 
 /*
  * Here we check the total amount of time elapsed since the oldest 
non-obsolete window.
- * This give the total windowSize of the batch which is the time used 
for Rate computation.
- * However, there is an issue if we do not have sufficient data for 
e.g. if only 1 second has elapsed in a 30 second
- * window, the measured rate will be very high.
- * Hence we assume that the elapsed time is always N-1 complete 
windows plus whatever fraction of the final window is complete.
+ * This gives the duration of computation time window which used to 
calculate Rate.
+ *
+ * For scenarios when rate computation is performed after at least 
`config.samples` have completed,
+ * the duration of computation time window is determined by:
+ *  window duration = (now - start time of oldest non-obsolete 
window)
+ *
+ * ## Special case: First ever window
+ * A special scenario occurs when rate calculation is performed before 
at least `config.samples` have completed
+ * (e.g. if only 1 second has elapsed in a 30 second). In such a 
scenario, window duration would be equal to the
+ * time elapsed in the current window (since oldest non-obsolete 
window is current window). This leads to the
+ * following values for rate. Consider the following example:
+ *  config.timeWindowMs() = 1s
+ *  config.samples() = 2
+ *  Record events (E) at timestamps:
+ *  E1 = CurrentTimeStamp (T1)
+ *  E2 = T1 + 30ms
+ *  E2 = T1 + 60ms

Review Comment:
   E2 -> E3



##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -34,22 +34,38 @@
  */
 public abstract class SampledStat implements MeasurableStat {
 
-  

[jira] [Comment Edited] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530703#comment-17530703
 ] 

Bruno Cadonna edited comment on KAFKA-13864 at 5/2/22 12:06 PM:


Any change that impacts the public interface of a class for which the build 
generates Javadocs is considered a major change and needs a KIP. The [KIP 
doc|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]
 is quite explicit about it.

Regarding the account: Let me know your account name and I can give you the 
needed permissions.


was (Author: cadonna):
Any change that impacts the public interface of a class for which the build 
generates Javadocs are considered a major change and needs a KIP. The [KIP 
doc|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]
 is quite explicit about it.

Regarding the account: Let me know your account name and I can give you the 
needed permissions.

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
> Attachments: interceptor_constructor_client.patch
>
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530703#comment-17530703
 ] 

Bruno Cadonna commented on KAFKA-13864:
---

Any change that impacts the public interface of a class for which the build 
generates Javadocs are considered a major change and needs a KIP. The [KIP 
doc|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]
 is quite explicit about it.

Regarding the account: Let me know your account name and I can give you the 
needed permissions.

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
> Attachments: interceptor_constructor_client.patch
>
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530697#comment-17530697
 ] 

François Rosière edited comment on KAFKA-13864 at 5/2/22 11:59 AM:
---

KIP looks overkill in this specific case as we simply create a new constructor 
+ increase the visibility of one existing constructor, no impact on existing 
API and the usage is still the exact same. But I can create one when I will 
have the right setup/accounts.


was (Author: JIRAUSER288866):
KIP looks overkill in this specific case as we simply create a new constructor 
+ increase the visibility of one existing constructor, no impact on existing 
API and the usage is still the exact same. But let me create it...

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
> Attachments: interceptor_constructor_client.patch
>
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530697#comment-17530697
 ] 

François Rosière commented on KAFKA-13864:
--

KIP looks overkill in this specific case as we simply create a new constructor 
+ increase the visibility of one existing constructor, no impact on existing 
API and the usage is still the exact same. But let me create it...

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
> Attachments: interceptor_constructor_client.patch
>
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530694#comment-17530694
 ] 

Bruno Cadonna commented on KAFKA-13864:
---

[~Jack-Lee] Could you please open a GitHub PR against trunk? Reviewing a PR is 
simpler than an attached patch. The title of the PR should start with 
"KAFKA-13864:". 

We still need a KIP before we can merge the PR.

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
> Attachments: interceptor_constructor_client.patch
>
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread lqjacklee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530689#comment-17530689
 ] 

lqjacklee commented on KAFKA-13864:
---

[~frosiere] [~cadonna]  please help review the patch. Once we need the KIP , 
could I take the task? 

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
> Attachments: interceptor_constructor_client.patch
>
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread lqjacklee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lqjacklee updated KAFKA-13864:
--
Attachment: interceptor_constructor_client.patch

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
> Attachments: interceptor_constructor_client.patch
>
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] divijvaidya commented on pull request #12113: MINOR: Small cleanups in connect/mirror

2022-05-02 Thread GitBox


divijvaidya commented on PR #12113:
URL: https://github.com/apache/kafka/pull/12113#issuecomment-1114739908

   Hey @mimaison, one of the flaky tests failing for this PR is 
`testListenerConnectionRateLimitWhenActualRateAboveLimit() – 
kafka.network.ConnectionQuotasTest` which I have fixed as part of 
https://github.com/apache/kafka/pull/12045 
   
   If you get a chance, please review my PR and that will reduce some degree of 
flakiness from the existing test suite. 
   
   Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530677#comment-17530677
 ] 

Bruno Cadonna commented on KAFKA-13864:
---

[~frosiere][~Jack-Lee] I think this ticket needs a KIP since it plans to change 
the public API.

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12106: KAFKA-13861: Fix the validateOnly behavior for CreatePartitions requests in KRaft mode

2022-05-02 Thread GitBox


divijvaidya commented on code in PR #12106:
URL: https://github.com/apache/kafka/pull/12106#discussion_r862747446


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -767,7 +767,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   setErrorCode(TOPIC_AUTHORIZATION_FAILED.code))
   }
 }
-controller.createPartitions(context, topics).thenApply { results =>
+controller.createPartitions(context, topics, 
request.validateOnly()).thenApply { results =>

Review Comment:
   nit
   
   you don't need the parenthesis in scala here. Simply `request.validateOnly` 
would work.



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1664,13 +1664,14 @@ public CompletableFuture 
updateFeatures(
 @Override
 public CompletableFuture> 
createPartitions(

Review Comment:
   You can handle the `validateOnly` here and downstream function calls do not 
have to know about it. This would greatly simplify the code because now 
`ReplicationControl` does not have to deal with parsing the result and handling 
the `validateOnly` flag
   
   e.g. this function implementation would change to
   ```
   if (topics.isEmpty()) {
   return 
CompletableFuture.completedFuture(Collections.emptyList());
   }
   
   return appendWriteEvent("createPartitions", context.deadlineNs(), () 
-> {
   final ControllerResult> result 
= replicationControl.createPartitions(topics);
   return validateOnly ? result.withoutRecords() : result;
   });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-13864:
--
Labels: needs-kip  (was: )

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>  Labels: needs-kip
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

François Rosière updated KAFKA-13864:
-
Description: 
To allow implementing Spring managed interceptors for producers and consumers,

[https://github.com/spring-projects/spring-kafka/issues/2244]

a new constructor should be added in KafkaProducer
{code:java}
public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
Serializer valueSerializer){code}
the visibility of one constructor of KafkaConsumer should also move from 
default to public.
{code:java}
public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
Deserializer valueDeserializer) {code}
see the current implementation 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671

This issue is quite blocking , so, any other alternative or proposal would be 
more than welcome.

Kafka streams is not concerned by this issue as the KafkaStreams object is 
already exposing a constructor taking a StreamsConfig object.

Thanks for considering this issue.

  was:
To allow implementing Spring managed interceptors for producers and consumers,

[https://github.com/spring-projects/spring-kafka/issues/2244]

a new constructor should be added in KafkaProducer
{code:java}
public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
Serializer valueSerializer){code}
the visibility of one constructor of KafkaConsumer should also move from 
default to public.
{code:java}
public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
Deserializer valueDeserializer) {code}
see the current implementation 
[here|[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671]]

This issue is quite blocking , so, any other alternative or proposal would be 
more than welcome.

Kafka streams is not concerned by this issue as the KafkaStreams object is 
already exposing a constructor taking a StreamsConfig object.

Thanks for considering this issue.


> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

François Rosière updated KAFKA-13864:
-
Description: 
To allow implementing Spring managed interceptors for producers and consumers,

[https://github.com/spring-projects/spring-kafka/issues/2244]

a new constructor should be added in KafkaProducer
{code:java}
public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
Serializer valueSerializer){code}
the visibility of one constructor of KafkaConsumer should also move from 
default to public.
{code:java}
public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
Deserializer valueDeserializer) {code}
see the current implementation 
[here|[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671]]

This issue is quite blocking , so, any other alternative or proposal would be 
more than welcome.

Kafka streams is not concerned by this issue as the KafkaStreams object is 
already exposing a constructor taking a StreamsConfig object.

Thanks for considering this issue.

  was:
To allow implementing Spring managed interceptors for producers and consumers,

[https://github.com/spring-projects/spring-kafka/issues/2244]

a new constructor should be added in KafkaProducer
{code:java}
public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
Serializer valueSerializer){code}
the visibility of one constructor of KafkaConsumer should also move from 
default to public.
{code:java}
public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
Deserializer valueDeserializer) {code}
see the current implementation [here|#L321]. 

This issue is quite blocking , so, any other alternative or proposal would be 
more than welcome.

Kafka streams is not concerned by this issue as the KafkaStreams object is 
already exposing a constructor taking a StreamsConfig object.

Thanks for considering this issue.


> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation 
> [here|[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671]]
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread lqjacklee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lqjacklee reassigned KAFKA-13864:
-

Assignee: lqjacklee

> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Assignee: lqjacklee
>Priority: Major
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation [here|#L321]. 
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

François Rosière updated KAFKA-13864:
-
Description: 
To allow implementing Spring managed interceptors for producers and consumers,

[https://github.com/spring-projects/spring-kafka/issues/2244]

a new constructor should be added in KafkaProducer
{code:java}
public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
Serializer valueSerializer){code}
the visibility of one constructor of KafkaConsumer should also move from 
default to public.
{code:java}
public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
Deserializer valueDeserializer) {code}
see the current implementation [here|#L321]. 

This issue is quite blocking , so, any other alternative or proposal would be 
more than welcome.

Kafka streams is not concerned by this issue as the KafkaStreams object is 
already exposing a constructor taking a StreamsConfig object.

Thanks for considering this issue.

  was:
To allow implementing Spring managed interceptors for producers and consumers,

[https://github.com/spring-projects/spring-kafka/issues/2244]

a new constructor should be added in KafkaProducer
{code:java}
public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
Serializer valueSerializer){code}
the visibility of one constructor of KafkaConsumer should also move from 
default to public.
{code:java}
public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
Deserializer valueDeserializer) {code}
see the current implementation [here|#L321]]. 

This issue is quite blocking , so, any other alternative or proposal would be 
more than welcome.

Kafka streams is not concerned by this issue as the KafkaStreams object is 
already exposing a constructor taking a StreamsConfig object.

Thanks for considering this issue.


> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Priority: Major
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation [here|#L321]. 
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

François Rosière updated KAFKA-13864:
-
Description: 
To allow implementing Spring managed interceptors for producers and consumers,

[https://github.com/spring-projects/spring-kafka/issues/2244]

a new constructor should be added in KafkaProducer

 
{code:java}
public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
Serializer valueSerializer){code}
 

the visibility of one constructor of KafkaConsumer should also move from 
default to public.

 
{code:java}
public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
Deserializer valueDeserializer) {code}
 

see the current implementation 
[here|[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L321]].
 

This issue is quite blocking , so, any other alternative or proposal would be 
more than welcome.

Kafka streams is not concerned by this issue as the KafkaStreams object is 
already exposing a constructor taking a StreamsConfig object.

Thanks for considering this issue.

  was:
To allow implementing Spring managed interceptors for producers and consumers,

[https://github.com/spring-projects/spring-kafka/issues/2244]

the visibility of one constructor of KafkaProducer and KafkaConsumer should 
move from the default modifier to the public modifier.

See

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L321]

and

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671]

For the KafkaProducer, it may make more sense to define an additional 
constructor
{code:java}
 public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
Serializer valueSerializer){code}
This issue is quite blocking , so, any other alternative or proposal would be 
more than welcome.

Kafka streams is not concerned by this issue as the KafkaStreams object is 
already exposing a constructor taking a StreamsConfig object.

Thanks for considering this issue.


> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Priority: Major
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
>  
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
>  
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
>  
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
>  
> see the current implementation 
> [here|[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L321]].
>  
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

François Rosière updated KAFKA-13864:
-
Description: 
To allow implementing Spring managed interceptors for producers and consumers,

[https://github.com/spring-projects/spring-kafka/issues/2244]

a new constructor should be added in KafkaProducer
{code:java}
public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
Serializer valueSerializer){code}
the visibility of one constructor of KafkaConsumer should also move from 
default to public.
{code:java}
public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
Deserializer valueDeserializer) {code}
see the current implementation [here|#L321]]. 

This issue is quite blocking , so, any other alternative or proposal would be 
more than welcome.

Kafka streams is not concerned by this issue as the KafkaStreams object is 
already exposing a constructor taking a StreamsConfig object.

Thanks for considering this issue.

  was:
To allow implementing Spring managed interceptors for producers and consumers,

[https://github.com/spring-projects/spring-kafka/issues/2244]

a new constructor should be added in KafkaProducer

 
{code:java}
public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
Serializer valueSerializer){code}
 

the visibility of one constructor of KafkaConsumer should also move from 
default to public.

 
{code:java}
public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
Deserializer valueDeserializer) {code}
 

see the current implementation 
[here|[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L321]].
 

This issue is quite blocking , so, any other alternative or proposal would be 
more than welcome.

Kafka streams is not concerned by this issue as the KafkaStreams object is 
already exposing a constructor taking a StreamsConfig object.

Thanks for considering this issue.


> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Priority: Major
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> a new constructor should be added in KafkaProducer
> {code:java}
> public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> the visibility of one constructor of KafkaConsumer should also move from 
> default to public.
> {code:java}
> public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
> Deserializer valueDeserializer) {code}
> see the current implementation [here|#L321]]. 
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor

2022-05-02 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

François Rosière updated KAFKA-13864:
-
Description: 
To allow implementing Spring managed interceptors for producers and consumers,

[https://github.com/spring-projects/spring-kafka/issues/2244]

the visibility of one constructor of KafkaProducer and KafkaConsumer should 
move from the default modifier to the public modifier.

See

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L321]

and

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671]

For the KafkaProducer, it may make more sense to define an additional 
constructor
{code:java}
 public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
Serializer valueSerializer){code}
This issue is quite blocking , so, any other alternative or proposal would be 
more than welcome.

Kafka streams is not concerned by this issue as the KafkaStreams object is 
already exposing a constructor taking a StreamsConfig object.

Thanks for considering this issue.

  was:
To allow implementing Spring managed interceptors for producers and consumers,

[https://github.com/spring-projects/spring-kafka/issues/2244]

the visibility of one constructor of KafkaProducer and KafkaConsumer should 
move from the default modifier to the public modifier.

See

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L321]

and

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671]

For the KafkaProducer, it may make more sense to expose the following 
constructor
{code:java}
 public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
Serializer valueSerializer){code}
Any other alternative or proposal would be more than welcome.

Kafka streams are not concerned by this issues as the KafkaStreams object is 
already exposing a constructor taking a StreamsConfig object.

Thanks for considering this issue.


> Change the visibility of a KafkaProducer and KafkaConsumer constructor
> --
>
> Key: KAFKA-13864
> URL: https://issues.apache.org/jira/browse/KAFKA-13864
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: François Rosière
>Priority: Major
>
> To allow implementing Spring managed interceptors for producers and consumers,
> [https://github.com/spring-projects/spring-kafka/issues/2244]
> the visibility of one constructor of KafkaProducer and KafkaConsumer should 
> move from the default modifier to the public modifier.
> See
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L321]
> and
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671]
> For the KafkaProducer, it may make more sense to define an additional 
> constructor
> {code:java}
>  public KafkaProducer(ProducerConfig config, Serializer keySerializer, 
> Serializer valueSerializer){code}
> This issue is quite blocking , so, any other alternative or proposal would be 
> more than welcome.
> Kafka streams is not concerned by this issue as the KafkaStreams object is 
> already exposing a constructor taking a StreamsConfig object.
> Thanks for considering this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12112: MINOR: Fix flaky testDescribeUnderReplicatedPartitions

2022-05-02 Thread GitBox


divijvaidya commented on code in PR #12112:
URL: https://github.com/apache/kafka/pull/12112#discussion_r862714992


##
core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala:
##
@@ -586,11 +586,14 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 try {
   killBroker(0)
   val aliveServers = brokers.filterNot(_.config.brokerId == 0)
-  TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
+  TestUtils.waitUntilTrue(

Review Comment:
   Can we alternatively use one of the existing methods in TestUtils to 
validate that the topic partition ISR contains rest of the 5 brokers e.g.
   using `TestUtils.waitForBrokersInIsr` could validate that the topic 
partition metadata exists in expected number of Isr even after one of the 
brokers is terminated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13865) Fix ResponseSendTimeMs metric in RequestChannel.scala was removed repeatedly

2022-05-02 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13865.
---
Fix Version/s: 3.3.0
   Resolution: Fixed

>  Fix ResponseSendTimeMs metric  in RequestChannel.scala  was removed 
> repeatedly
> ---
>
> Key: KAFKA-13865
> URL: https://issues.apache.org/jira/browse/KAFKA-13865
> Project: Kafka
>  Issue Type: Bug
>Reporter: zhaobo
>Priority: Minor
> Fix For: 3.3.0
>
>
> ResponseSendTimeMs metric was removed in line 576,but we removed it again in 
> line 578.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] showuon merged pull request #12111: KAFKA-13865: Fix ResponseSendTimeMs metric in RequestChannel.scala was removed repeatedly

2022-05-02 Thread GitBox


showuon merged PR #12111:
URL: https://github.com/apache/kafka/pull/12111


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11844: KAFKA#13702 - Connect RestClient overrides response status code on request failure

2022-05-02 Thread GitBox


mimaison commented on PR #11844:
URL: https://github.com/apache/kafka/pull/11844#issuecomment-1114635179

   @Corlobin I agree with @C0urante, it would be good to have a test for this. 
Are you interested in trying the approach that has been suggested?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison opened a new pull request, #12113: MINOR: Small cleanups in connect/mirror

2022-05-02 Thread GitBox


mimaison opened a new pull request, #12113:
URL: https://github.com/apache/kafka/pull/12113

   - Make a few fields `final`
   - Remove unnecessary `throws`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13773) Data loss after recovery from crash due to full hard disk

2022-05-02 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-13773:
-

Assignee: Luke Chen

> Data loss after recovery from crash due to full hard disk
> -
>
> Key: KAFKA-13773
> URL: https://issues.apache.org/jira/browse/KAFKA-13773
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 2.8.0, 3.1.0, 2.8.1
>Reporter: Tim Alkemade
>Assignee: Luke Chen
>Priority: Critical
> Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, 
> kafka-2.8.0-crash.zip, kafka-logfiles.zip, kafka-start-to-finish.zip
>
>
> While doing some testing of Kafka on Kubernetes, the data disk for kafka 
> filled up, which led to all 3 nodes crashing. I increased the disk size for 
> all three nodes and started up kafka again (one by one, waiting for the 
> previous node to become available before starting the next one). After a 
> little while two out of three nodes had no data anymore.
> According to the logs, the log cleaner kicked in and decided that the latest 
> timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older 
> than the 2 week limit specified on the topic.
>  
> {code:java}
> 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, 
> dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files 
> LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, 
> largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0]
> 2022-03-28 12:17:19,753 INFO Deleted log 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted offset index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted time index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]{code}
> Using kafka-dump-log.sh I was able to determine that the greatest timestamp 
> in that file (before deletion) was actually 1648460888636 ( 2022-03-28, 
> 09:48:08 UTC, which is today). However since this segment was the 
> 'latest/current' segment much of the file is empty. The code that determines 
> the last entry (TimeIndex.lastEntryFromIndexFile)  doesn't seem to know this 
> and just read the last position in the file, the file being mostly empty 
> causes it to read 0 for that position.
> The cleaner code seems to take this into account since 
> UnifiedLog.deleteOldSegments is never supposed to delete the current segment, 
> judging by the scaladoc, however in this case the check doesn't seem to do 
> its job. Perhaps the detected highWatermark is wrong?
> I've attached the logs and the zipped data directories (data files are over 
> 3Gb in size when unzipped)
>  
> I've encountered this problem with both kafka 2.8.1 and 3.1.0.
> I've also tried changing min.insync.replicas to 2: The issue still occurs.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)