[GitHub] [kafka] akhileshchg commented on a diff in pull request #12106: KAFKA-13861: Fix the validateOnly behavior for CreatePartitions requests in KRaft mode
akhileshchg commented on code in PR #12106: URL: https://github.com/apache/kafka/pull/12106#discussion_r863475611 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1364,7 +1364,8 @@ Boolean isBrokerUnfenced(int brokerId) { setErrorCode(apiError.error().code()). setErrorMessage(apiError.message())); } -return new ControllerResult<>(records, results, true); +log.debug("CreatePartitions result(s): {}", results); Review Comment: I'll put a debug log in the controller layer before processing the final list of topics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akhileshchg commented on a diff in pull request #12106: KAFKA-13861: Fix the validateOnly behavior for CreatePartitions requests in KRaft mode
akhileshchg commented on code in PR #12106: URL: https://github.com/apache/kafka/pull/12106#discussion_r863474931 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -499,23 +504,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get, () => s"$desc: Expect InvalidPartitionsException when requesting a noop") assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc) - assertEquals("Topic already has 3 partitions.", e.getCause.getMessage, desc) + exceptionMsgStr = if (TestInfoUtils.isKRaft(testInfo)) { +"Topic already has 3 partition(s)." + } else { +"Topic already has 3 partitions." Review Comment: There are too many error messages in the `ReplicationControl` layer, and I'm not sure how many other layers need to be changed. I think this can be a different PR to keep the messages intact if required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #11889: KAFKA-13667: Make listeners mandatory in kraft mode
dengziming commented on PR #11889: URL: https://github.com/apache/kafka/pull/11889#issuecomment-1115729719 @showuon Oh yeah, I think I misunderstood your intention here. Firstly I think it's weird to have a config optional for the broker node but mandatory for the controller node and combine node so I wanted to make it mandatory for all kraft nodes. I think the problem here is just to make sure we should provide both listeners for the combined node, is this right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863330742 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java: ## @@ -51,95 +41,58 @@ public class ConnectProtocolCompatibilityTest { private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0); private ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0); -@Rule -public MockitoRule rule = MockitoJUnit.rule(); - -@Mock -private KafkaConfigBackingStore configStorage; -private ClusterConfigState configState; - -@Before -public void setup() { -configStorage = mock(KafkaConfigBackingStore.class); -configState = new ClusterConfigState( -1L, -null, -Collections.singletonMap(connectorId1, 1), -Collections.singletonMap(connectorId1, new HashMap<>()), -Collections.singletonMap(connectorId1, TargetState.STARTED), -Collections.singletonMap(taskId1x0, new HashMap<>()), -Collections.emptySet()); -} - -@After -public void teardown() { -verifyNoMoreInteractions(configStorage); -} Review Comment: All of this is completely unnecessary and can be removed without diminishing testing coverage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on PR #11983: URL: https://github.com/apache/kafka/pull/11983#issuecomment-1115554702 Thanks @showuon, good call with the improvement to the serialization logic. Took a bit of legwork but I've pushed a change that implements that and also cleans up some testing clutter; LMKWYT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863330276 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment( log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments); log.debug("Incremental task assignments: {}", incrementalTaskAssignments); +Map> revokedConnectors = transformValues(toRevoke, ConnectorsAndTasks::connectors); +Map> revokedTasks = transformValues(toRevoke, ConnectorsAndTasks::tasks); + return new ClusterAssignment( incrementalConnectorAssignments, incrementalTaskAssignments, -transformValues(toRevoke, ConnectorsAndTasks::connectors), -transformValues(toRevoke, ConnectorsAndTasks::tasks), -connectorAssignments, -taskAssignments +revokedConnectors, +revokedTasks, +diff(connectorAssignments, revokedConnectors), +diff(taskAssignments, revokedTasks) Review Comment: I don't think so; it looks like we compute load-balancing revocations later on, around line 300. At line 279, the `completeWorkerAssignment` that we derive `connectorAssignments` and `taskAssignments` from only has the `deleted` connectors and tasks removed from it; everything else is still included. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863330276 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment( log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments); log.debug("Incremental task assignments: {}", incrementalTaskAssignments); +Map> revokedConnectors = transformValues(toRevoke, ConnectorsAndTasks::connectors); +Map> revokedTasks = transformValues(toRevoke, ConnectorsAndTasks::tasks); + return new ClusterAssignment( incrementalConnectorAssignments, incrementalTaskAssignments, -transformValues(toRevoke, ConnectorsAndTasks::connectors), -transformValues(toRevoke, ConnectorsAndTasks::tasks), -connectorAssignments, -taskAssignments +revokedConnectors, +revokedTasks, +diff(connectorAssignments, revokedConnectors), +diff(taskAssignments, revokedTasks) Review Comment: I don't think so; it looks like we compute load-balancing revocations later on, around line 300. At line 279, the `completeWorkerAssignment` that we derive `connectorAssignments` and `taskAssignments` from only has the `deleted` connectors and tasks removed from it; everything else (including load-balancing revocations and duplicated assignments) is still included. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863329552 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java: ## @@ -230,15 +230,16 @@ public static ExtendedWorkerState deserializeMetadata(ByteBuffer buffer) { * ScheduledDelay => Int32 * */ -public static ByteBuffer serializeAssignment(ExtendedAssignment assignment) { +public static ByteBuffer serializeAssignment(ExtendedAssignment assignment, boolean sessioned) { // comparison depends on reference equality for now if (assignment == null || ExtendedAssignment.empty().equals(assignment)) { return null; } Struct struct = assignment.toStruct(); -ByteBuffer buffer = ByteBuffer.allocate(CONNECT_PROTOCOL_HEADER_V1.sizeOf() +Struct protocolHeader = sessioned ? CONNECT_PROTOCOL_HEADER_V2 : CONNECT_PROTOCOL_HEADER_V1; +ByteBuffer buffer = ByteBuffer.allocate(protocolHeader.sizeOf() + ASSIGNMENT_V1.sizeOf(struct)); -CONNECT_PROTOCOL_HEADER_V1.writeTo(buffer); +protocolHeader.writeTo(buffer); Review Comment: Ah yeah, good call! Much cleaner than what we had before. It was a little more involved than I initially thought to make this change but IMO the end result is cleaner and easier to read, so hopefully it's worth the inflation in the diff here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13859) SCRAM authentication issues with kafka-clients 3.0.1
[ https://issues.apache.org/jira/browse/KAFKA-13859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17531010#comment-17531010 ] Luke Chen commented on KAFKA-13859: --- [~opayne] , thanks for the response. That confirmed our investigation that the broker version is older than v2.8 and client version is greater than v3.0. The exception is expected. The workaround, as [~dengziming] suggested, to disable the idempotent producer, or upgrade the broker version to v2.8 or higher. Thank you. > SCRAM authentication issues with kafka-clients 3.0.1 > > > Key: KAFKA-13859 > URL: https://issues.apache.org/jira/browse/KAFKA-13859 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.1 >Reporter: Oliver Payne >Assignee: dengziming >Priority: Major > > When attempting to produce records to Kafka using a client configured with > SCRAM authentication, the authentication is being rejected, and the following > exception is thrown: > {{org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster > authorization failed.}} > I am seeing this happen with a Springboot service that was recently upgraded > to 2.6.5. After looking into this, I learned that Springboot moved to > kafka-clients 3.0.1 from 3.0.0 in that version. And sure enough, downgrading > to kafka-clients resolved the issue, with no changes made to the configs. > I have also attempted to connect to a separate server with kafka-clients > 3.0.1, using plaintext authentication. That works fine. So the issue appears > to be with SCRAM authentication. > I will note that I am attempting to connect to an AWS MSK instance. We use > SCRAM-SHA-512 as our sasl mechanism, using the basic {{ScramLoginModule.}} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] showuon commented on pull request #12105: KAFKA-13859: Disable idempotence on SCRAM authentication
showuon commented on PR #12105: URL: https://github.com/apache/kafka/pull/12105#issuecomment-1115548710 > To be clear, the broker would have to be older than 2.8 for the issue to occur. The server change for KIP-679 happened in Apache Kafka 2.8. Yes, the user confirmed that their broker version is in v2.6.2 and client version is in v3.0.1. This is the expected behavior. Thanks @dengziming and @ijuma ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
showuon commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863325064 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment( log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments); log.debug("Incremental task assignments: {}", incrementalTaskAssignments); +Map> revokedConnectors = transformValues(toRevoke, ConnectorsAndTasks::connectors); +Map> revokedTasks = transformValues(toRevoke, ConnectorsAndTasks::tasks); + return new ClusterAssignment( incrementalConnectorAssignments, incrementalTaskAssignments, -transformValues(toRevoke, ConnectorsAndTasks::connectors), -transformValues(toRevoke, ConnectorsAndTasks::tasks), -connectorAssignments, -taskAssignments +revokedConnectors, +revokedTasks, +diff(connectorAssignments, revokedConnectors), +diff(taskAssignments, revokedTasks) Review Comment: I think the `connectorAssignments` should be equal to `diff(connectorAssignments, revokedConnectors)` and `taskAssignments == diff(taskAssignments, revokedTasks)`, because in L279: https://github.com/apache/kafka/pull/11983/files#diff-e24067b121eb960feebfa099bd9c30382e330eaf5db39302a9d7a50e29b3acb4R279-R283 We already removed the revoked connectors/tasks. Is my understanding correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863308185 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -108,18 +107,15 @@ public Map performAssignment(String leaderId, String protoco log.debug("Max config offset root: {}, local snapshot config offsets root: {}", maxOffset, coordinator.configSnapshot().offset()); -short protocolVersion = memberConfigs.values().stream() -.allMatch(state -> state.assignment().version() == CONNECT_PROTOCOL_V2) -? CONNECT_PROTOCOL_V2 -: CONNECT_PROTOCOL_V1; +short protocolVersion = ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion(); Review Comment: Yep, exactly Should've known that when I implemented KIP-507 originally but was still getting my bearings with the group coordinator logic. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -108,18 +107,15 @@ public Map performAssignment(String leaderId, String protoco log.debug("Max config offset root: {}, local snapshot config offsets root: {}", maxOffset, coordinator.configSnapshot().offset()); -short protocolVersion = memberConfigs.values().stream() -.allMatch(state -> state.assignment().version() == CONNECT_PROTOCOL_V2) -? CONNECT_PROTOCOL_V2 -: CONNECT_PROTOCOL_V1; +short protocolVersion = ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion(); Review Comment: Yep, exactly Should've known that when I implemented KIP-507 originally but was still getting my bearings with the group coordinator logic. Better late than never! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863308048 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment( log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments); log.debug("Incremental task assignments: {}", incrementalTaskAssignments); +Map> revokedConnectors = transformValues(toRevoke, ConnectorsAndTasks::connectors); +Map> revokedTasks = transformValues(toRevoke, ConnectorsAndTasks::tasks); + return new ClusterAssignment( incrementalConnectorAssignments, incrementalTaskAssignments, -transformValues(toRevoke, ConnectorsAndTasks::connectors), -transformValues(toRevoke, ConnectorsAndTasks::tasks), -connectorAssignments, -taskAssignments +revokedConnectors, +revokedTasks, +diff(connectorAssignments, revokedConnectors), +diff(taskAssignments, revokedTasks) Review Comment: Yep! There was a bug here. FWIW the same issue is already fixed by https://github.com/apache/kafka/pull/12019. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12041: MINOR: ignore unused configuration when ConsumerCoordinator is not constructed
C0urante commented on PR #12041: URL: https://github.com/apache/kafka/pull/12041#issuecomment-1115515896 Thanks Guozhang. I think the cost of logging warnings in cases like this is fairly low as users can and should adjust their configurations to not use nonsensical properties, and the benefit can be high in the event that a user is confused about client behavior. I do sympathize with concerns that the warning for an unused property may make it seem like the property is unconditionally unrecognized (i.e., not defined by a client at all) instead of conditionally unrecognized (i.e., not used because of other properties). One alternative could be to use the newly-introduced [ConnectUtils::ensureProperty](https://github.com/apache/kafka/blob/8245c9a3d5af2ad891844194a5fa281af471b568/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java#L77-L101) or something similar to it (possibly one that logs a warning if _any_ value for a specific property is given, regardless of whether it matches the default). This way, we could continue logging warnings for cases like these, but make it clear exactly why the property should not be included in the config. Either way, I think the piecemeal logic introduced in this PR is suboptimal. Dedicating one line for every to-be-ignored property is unnecessary if we want to remove these warnings for all properties defined by a client; in that case, we can use the [approach I described earlier](https://github.com/apache/kafka/pull/12041#issuecomment-1100583118), which will be easier to maintain and take up less space. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13857) The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition
[ https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17531003#comment-17531003 ] RivenSun commented on KAFKA-13857: -- [~guozhang] Thank you for your reply and approval. I don't have a strong need to query LEO right now. This feature may can be supported in future releases. I may be busy with other things recently, and anyone interested in this can assign this JIRA to themselves. Thanks. > The listOffsets method of KafkaAdminClient should support returning > logEndOffset of topicPartition > -- > > Key: KAFKA-13857 > URL: https://issues.apache.org/jira/browse/KAFKA-13857 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: RivenSun >Priority: Major > > The server side currently handles the LIST_OFFSETS request process as follows: > {code:java} > KafkaApis.handleListOffsetRequest() -> > KafkaApis.handleListOffsetRequestV1AndAbove() -> > ReplicaManager.fetchOffsetForTimestamp() -> > Partition.fetchOffsetForTimestamp(){code} > > In the last method above, it is obvious that when the client side does not > pass the isolationLevel value, the server side supports returning > localLog.logEndOffset. > {code:java} > val lastFetchableOffset = isolationLevel match { > case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset > case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark > case None => localLog.logEndOffset > } > {code} > > > KafkaAdminClient is an operation and maintenance management tool, which > *should be different from the listOffsets-related methods (offsetsForTimes, > beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not > be limited by the value of {color:#ff}isolationLevel {color}in the > ListOffsetsOptions parameter.* > In the current KafkaAdminClient.listOffsets() method, both the AdminClient > and the server consider isolationLevel as a required parameter: > 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException > will be thrown when AdminClient executes listOffsets() method. > {code:java} > ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code} > 2) The current logic for converting isolationLevel on the server side has not > yet handled the case where the user passes in a value that is neither > READ_UNCOMMITTED nor READ_COMMITTED : > {code:java} > val isolationLevelOpt = if (isClientRequest) > Some(offsetRequest.isolationLevel) > else > None {code} > {code:java} > public IsolationLevel isolationLevel() { > return IsolationLevel.forId(data.isolationLevel()); > } {code} > h1. > h2. Suggestion: > Added a new enum `NONE` in IsolationLevel, only dedicated to > AdminClient.listOffsets() method. > This change may cause the highestSupportedVersion of > ApiMessageType.LIST_OFFSETS to increase by one. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] LeonSamuel opened a new pull request, #12115: Update README.md
LeonSamuel opened a new pull request, #12115: URL: https://github.com/apache/kafka/pull/12115 As someone new to Kafka, it would have been welcoming to see a succinct high-level overview of what Kafka is as the first piece of documentation. As I clicked on the home page of Kafka website, I read about seemingly disconnected pieces of what Kafka could do but the project didn't succeed in taking on a tangible form or differentiating itself from similar scaling systems. This leaves it up to the user to either quit or continue to push into a third attempt of research - but now with more apprehension. I thought my additions (quoted from the Kafka website) would add some clarity and decrease the barrier for those who may want to quickly learn about the system. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [X] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12106: KAFKA-13861: Fix the validateOnly behavior for CreatePartitions requests in KRaft mode
hachikuji commented on code in PR #12106: URL: https://github.com/apache/kafka/pull/12106#discussion_r863259182 ## core/src/test/scala/unit/kafka/server/ControllerApisTest.scala: ## @@ -730,8 +729,45 @@ class ControllerApisTest { request.topics().add(new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5)) request.topics().add(new CreatePartitionsTopic().setName("baz").setAssignments(null).setCount(5)) assertEquals(Set(new CreatePartitionsTopicResult().setName("foo"). -setErrorCode(NONE.code()). -setErrorMessage(null), + setErrorCode(NONE.code()). + setErrorMessage(null), + new CreatePartitionsTopicResult().setName("bar"). +setErrorCode(INVALID_REQUEST.code()). +setErrorMessage("Duplicate topic name."), + new CreatePartitionsTopicResult().setName("baz"). +setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()). +setErrorMessage(null)), + controllerApis.createPartitions(ANONYMOUS_CONTEXT, request, +_ => Set("foo", "bar")).get().asScala.toSet) + } + + @Test + def testValidateOnlyCreatePartitionsRequest(): Unit = { Review Comment: Is this test basically the same as `testCreatePartitionsRequest`? Maybe we can get rid of one and turn the other into a `@ParameterizedTest` with `validateOnly` as the parameter? ## metadata/src/main/java/org/apache/kafka/controller/Controller.java: ## @@ -328,11 +328,15 @@ CompletableFuture updateFeatures( * Create partitions on certain topics. * * @param topicsThe list of topics to create partitions for. + * @param validateOnly If true, create partitions is just validated and returns response Review Comment: nit: how about this? > If true, the request is validated, but no partitions will be created. ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1364,7 +1364,8 @@ Boolean isBrokerUnfenced(int brokerId) { setErrorCode(apiError.error().code()). setErrorMessage(apiError.message())); } -return new ControllerResult<>(records, results, true); +log.debug("CreatePartitions result(s): {}", results); Review Comment: Hmm.. It is useful to know in the logs if `validateOnly` was set. ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -499,23 +504,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { e = assertThrows(classOf[ExecutionException], () => alterResult.values.get(topic2).get, () => s"$desc: Expect InvalidPartitionsException when requesting a noop") assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException], desc) - assertEquals("Topic already has 3 partitions.", e.getCause.getMessage, desc) + exceptionMsgStr = if (TestInfoUtils.isKRaft(testInfo)) { +"Topic already has 3 partition(s)." + } else { +"Topic already has 3 partitions." Review Comment: I wonder if we can unify the error messages? The differences do not seem interesting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13867) Improve JavaDoc for MetadataVersion.java
[ https://issues.apache.org/jira/browse/KAFKA-13867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530971#comment-17530971 ] Alyssa Huang commented on KAFKA-13867: -- ^ just confirming that you mean `MetadataVersion#ibpVersion`? > Improve JavaDoc for MetadataVersion.java > > > Key: KAFKA-13867 > URL: https://issues.apache.org/jira/browse/KAFKA-13867 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] cmccabe commented on pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
cmccabe commented on PR #12072: URL: https://github.com/apache/kafka/pull/12072#issuecomment-1115466993 Thanks for this PR, @ahuang98. And thanks to everyone who reviewed. I filed https://issues.apache.org/jira/browse/KAFKA-13867 for two very minor issues that we discussed here (one javadoc improvement issue, one small field name issue). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13867) Improve JavaDoc for MetadataVersion.java
[ https://issues.apache.org/jira/browse/KAFKA-13867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530969#comment-17530969 ] Colin McCabe commented on KAFKA-13867: -- I also suggest renaming `MetadataVersion#version` to `MetadataVersion#fullName` > Improve JavaDoc for MetadataVersion.java > > > Key: KAFKA-13867 > URL: https://issues.apache.org/jira/browse/KAFKA-13867 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13867) Improve JavaDoc for MetadataVersion.java
Colin McCabe created KAFKA-13867: Summary: Improve JavaDoc for MetadataVersion.java Key: KAFKA-13867 URL: https://issues.apache.org/jira/browse/KAFKA-13867 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] cmccabe commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
cmccabe commented on code in PR #12072: URL: https://github.com/apache/kafka/pull/12072#discussion_r863258243 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.kafka.common.record.RecordVersion; + +/** + * This class contains the different Kafka versions. + * Right now, we use them for upgrades - users can configure the version of the API brokers will use to communicate between themselves. + * This is only for inter-broker communications - when communicating with clients, the client decides on the API version. + * + * Note that the ID we initialize for each version is important. + * We consider a version newer than another if it is lower in the enum list (to avoid depending on lexicographic order) + * + * Since the api protocol may change more than once within the same release and to facilitate people deploying code from + * trunk, we have the concept of internal versions (first introduced during the 0.10.0 development cycle). For example, + * the first time we introduce a version change in a release, say 0.10.0, we will add a config value "0.10.0-IV0" and a + * corresponding enum constant IBP_0_10_0-IV0. We will also add a config value "0.10.0" that will be mapped to the + * latest internal version object, which is IBP_0_10_0-IV0. When we change the protocol a second time while developing + * 0.10.0, we will add a new config value "0.10.0-IV1" and a corresponding enum constant IBP_0_10_0-IV1. We will change + * the config value "0.10.0" to map to the latest internal version IBP_0_10_0-IV1. The config value of + * "0.10.0-IV0" is still mapped to IBP_0_10_0-IV0. This way, if people are deploying from trunk, they can use + * "0.10.0-IV0" and "0.10.0-IV1" to upgrade one internal version at a time. For most people who just want to use + * released version, they can use "0.10.0" when upgrading to the 0.10.0 release. + */ +public enum MetadataVersion { +IBP_0_8_0(-1), +IBP_0_8_1(-1), +IBP_0_8_2(-1), +IBP_0_9_0(-1), + +// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format. +IBP_0_10_0_IV0(-1), + +// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL handshake). +IBP_0_10_0_IV1(-1), + +// introduced for JoinGroup protocol change in KIP-62 +IBP_0_10_1_IV0(-1), + +// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit). +IBP_0_10_1_IV1(-1), + +// introduced ListOffsetRequest v1 in KIP-79 +IBP_0_10_1_IV2(-1), + +// introduced UpdateMetadataRequest v3 in KIP-103 +IBP_0_10_2_IV0(-1), + +// KIP-98 (idempotent and transactional producer support) +IBP_0_11_0_IV0(-1), + +// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107 +IBP_0_11_0_IV1(-1), + +// Introduced leader epoch fetches to the replica fetcher via KIP-101 +IBP_0_11_0_IV2(-1), + +// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112 +IBP_1_0_IV0(-1), + +// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental fetch requests, +// and KafkaStorageException for fetch requests. +IBP_1_1_IV0(-1), + +// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log divergence between leader and follower after fast leader fail over) +IBP_2_0_IV0(-1), + +// Several request versions were bumped due to KIP-219 (Improve quota communication) +IBP_2_0_IV1(-1), + +// Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211) +IBP_2_1_IV0(-1), + +// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320) +IBP_2_1_IV1(-1), + +// Support ZStandard Compression Codec (KIP-110) +IBP_2_1_IV2(-1), + +// Introduced broker generation (KIP-380), and +// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1 +IBP_2_2_IV0(-1), + +// New
[GitHub] [kafka] cmccabe merged pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
cmccabe merged PR #12072: URL: https://github.com/apache/kafka/pull/12072 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
cmccabe commented on code in PR #12072: URL: https://github.com/apache/kafka/pull/12072#discussion_r863255928 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java: ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.stream.Collectors; +import org.apache.kafka.common.config.ConfigDef.Validator; +import org.apache.kafka.common.config.ConfigException; + +public class MetadataVersionValidator implements Validator { + +@Override +public void ensureValid(String name, Object value) { +try { +MetadataVersion.fromVersionString(value.toString()); +} catch (IllegalArgumentException e) { +throw new ConfigException(name, value.toString(), e.getMessage()); +} +} + +@Override +public String toString() { Review Comment: > I'm not sure if I follow the discussion on the toString location, are we saying that it should be moved? I think the proposal was to move `MetadataVersionValidator` to a (static?) inner class of `MetadataVersion`. In general it's nice to keep the source files small, though, I think, so I'd recommend leaving as-is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akhileshchg commented on a diff in pull request #12106: KAFKA-13861: Fix the validateOnly behavior for CreatePartitions requests in KRaft mode
akhileshchg commented on code in PR #12106: URL: https://github.com/apache/kafka/pull/12106#discussion_r863179905 ## core/src/test/java/kafka/test/MockController.java: ## @@ -410,10 +410,16 @@ public CompletableFuture updateFeatures( throw new UnsupportedOperationException(); } +boolean lastCreatePartitionsValidateOnly = false; Review Comment: Added a new unit test and remove `lastCreatePartitionsValidateOnly` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530937#comment-17530937 ] François Rosière commented on KAFKA-13864: -- [~cadonna], table updated and discussion started. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > Attachments: interceptor_constructor_client.patch > > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Comment Edited] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530937#comment-17530937 ] François Rosière edited comment on KAFKA-13864 at 5/2/22 8:44 PM: -- [~cadonna], table updated and discussion started. Thanks for the infos. was (Author: JIRAUSER288866): [~cadonna], table updated and discussion started. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > Attachments: interceptor_constructor_client.patch > > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] akhileshchg commented on a diff in pull request #12106: KAFKA-13861: Fix the validateOnly behavior for CreatePartitions requests in KRaft mode
akhileshchg commented on code in PR #12106: URL: https://github.com/apache/kafka/pull/12106#discussion_r863095080 ## core/src/test/java/kafka/test/MockController.java: ## @@ -410,10 +410,16 @@ public CompletableFuture updateFeatures( throw new UnsupportedOperationException(); } +boolean lastCreatePartitionsValidateOnly = false; Review Comment: The `MockController` doesn't maintain any state of the topics/partitions that are created and deleted. So, I thought this was a more straightforward fix to check if the `validateOnly` flag is used. Please let met know if you have some other ideas. ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1364,7 +1364,25 @@ Boolean isBrokerUnfenced(int brokerId) { setErrorCode(apiError.error().code()). setErrorMessage(apiError.message())); } -return new ControllerResult<>(records, results, true); +StringBuilder resultsBuilder = new StringBuilder(); Review Comment: Sure. That seems reasonable. I did this to match the `createTopics` log statement. ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -767,7 +767,7 @@ class ControllerApis(val requestChannel: RequestChannel, setErrorCode(TOPIC_AUTHORIZATION_FAILED.code)) } } -controller.createPartitions(context, topics).thenApply { results => +controller.createPartitions(context, topics, request.validateOnly()).thenApply { results => Review Comment: Done. ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -767,7 +767,7 @@ class ControllerApis(val requestChannel: RequestChannel, setErrorCode(TOPIC_AUTHORIZATION_FAILED.code)) } } -controller.createPartitions(context, topics).thenApply { results => +controller.createPartitions(context, topics, request.validateOnly()).thenApply { results => Review Comment: I enabled the test for both KRaft and Zk modes now. The existing test is robust and covers all the cases. ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1664,13 +1664,14 @@ public CompletableFuture updateFeatures( @Override public CompletableFuture> createPartitions( Review Comment: That's a good idea. Will do this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13636) Committed offsets could be deleted during a rebalance if a group did not commit for a while
[ https://issues.apache.org/jira/browse/KAFKA-13636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Olson updated KAFKA-13636: - Description: The group coordinator might delete invalid offsets during a group rebalance. During a rebalance, the coordinator is relying on the last commit timestamp ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state modification {_}timestamp (currentStateTimestamp{_}) to detect expired offsets. This is relatively easy to reproduce by playing with group.initial.rebalance.delay.ms, offset.retention.minutes and offset.check.retention.interval, I uploaded an example on: [https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] . This script does: * Start a broker with: offset.retention.minute=2, o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000, group.initial.rebalance.delay=2 * Produced 10 messages * Create a consumer group to consume 10 messages, and disable auto.commit to only commit a few times * Wait 3 minutes, then the Consumer get a {{kill -9}} * Restart the consumer after a few seconds * The consumer restart from {{auto.offset.reset}} , the offset got removed The cause is due to the GroupMetadata.scala: * When the group get emptied, the {{subscribedTopics}} is set to {{Set.empty}} ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521]) * When the new member joins, we add the new member right away in the group ; BUT the {{subscribedTopics}} is only updated once the migration is over (in the initNewGeneration) (which could take a while due to the {{{}group.initial.rebalance.delay{}}}) * When the log cleaner got executed, {{subscribedTopics.isDefined}} returns true as {{Set.empty != None}} (the underlying condition) * Thus we enter [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785] with an empty {{subscribedTopics}} list and we are relying on the {{commitTimestamp}} regardless of the {{currentStateTimestamp}} This seem to be a regression generated by KIP-496 https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges (KAFKA-8338, KAFKA-8370) was: The group coordinator might delete invalid offsets during a group rebalance. During a rebalance, the coordinator is relying on the last commit timestamp ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state modification {_}timestampt (currentStateTimestamp{_}) to detect expired offsets. This is relatively easy to reproduce by playing with group.initial.rebalance.delay.ms, offset.retention.minutes and offset.check.retention.interval, I uploaded an example on: [https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] . This script does: * Start a broker with: offset.retention.minute=2, o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000, group.initial.rebalance.delay=2 * Produced 10 messages * Create a consumer group to consume 10 messages, and disable auto.commit to only commit a few times * Wait 3 minutes, then the Consumer get a {{kill -9}} * Restart the consumer after a few seconds * The consumer restart from {{auto.offset.reset}} , the offset got removed The cause is due to the GroupMetadata.scala: * When the group get emptied, the {{subscribedTopics}} is set to {{Set.empty}} ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521]) * When the new member joins, we add the new member right away in the group ; BUT the {{subscribedTopics}} is only updated once the migration is over (in the initNewGeneration) (which could take a while due to the {{{}group.initial.rebalance.delay{}}}) * When the log cleaner got executed, {{subscribedTopics.isDefined}} returns true as {{Set.empty != None}} (the underlying condition) * Thus we enter [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785] with an empty {{subscribedTopics}} list and we are relying on the {{commitTimestamp}} regardless of the {{currentStateTimestamp}} This seem to be a regression generated by KIP-496 https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges > Committed offsets could be deleted during a rebalance if a group did not > commit for a while > --- > > Key: KAFKA-13636 > URL: https://issues.apache.org/jira/browse/KAFKA-13636 > Project: Kafka >
[GitHub] [kafka] cadonna commented on a diff in pull request #12114: MINOR: Note that slf4j-log4j in version 1.7.35+ should be used
cadonna commented on code in PR #12114: URL: https://github.com/apache/kafka/pull/12114#discussion_r863082871 ## docs/upgrade.html: ## @@ -73,7 +73,11 @@ Notable changes in 3 via Connect worker and/or connector configuration. Connect may enable idempotent producers by default in a future major release. Kafka has replaced log4j and slf4j-log4j12 with reload4j and slf4j-reload4j due to security concerns. Review Comment: @ijuma changed the text as you proposed. I will cherry-pick the commit to 3.2 and 3.1. @tombentley Since this is just a doc change, we do not need a new RC, right? We can port the change directly to the doc repo in case RC1 passes the votes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13866) Support more advanced time retention policies
Matthias J. Sax created KAFKA-13866: --- Summary: Support more advanced time retention policies Key: KAFKA-13866 URL: https://issues.apache.org/jira/browse/KAFKA-13866 Project: Kafka Issue Type: Improvement Components: config, core, log cleaner Reporter: Matthias J. Sax Time-based retention policy compares the record timestamp to broker wall-clock time. Those semantics are questionable and also lead to issues for data reprocessing: If one want to re-process older data then retention time, it's not possible as broker expire those record aggressively and user need to increate the retention time accordingly. Especially for Kafka Stream, we have seen many cases when users got bit by the current behavior. It would be best, if Kafka would track _two_ timestamps per record: the record event-time (as the broker do currently), plus the log append-time (which is only tracked currently if the topic is configured with "append-time" tracking, but the issue is, that it overwrite the producer provided record event-time). Tracking both timestamps would allow to set a pure wall-clock time retention time plus a pure event-time retention time policy: * Wall-clock time: keep (at least) the date X days after writing * Event-time: keep (at max) the X days worth of event-time data Comparing wall-clock time to wall-clock time and event-time to event-time provides much cleaner semantics. The idea is to combine both policies and only expire data if both policies trigger. For the event-time policy, the broker would need to track "stream time" as max event-timestamp it has see per partition (similar to how Kafka Streams is tracking "stream time" client side). Note the difference between "at least" and "at max" above: for the data-reprocessing case, the max-based event-time policy avoids that the broker would keep a huge history for the reprocessing case. It would be part of a KIP discussion on the details how wall-clock/event-time and mix/max policies could be combined. For example, it might also be useful to have the following policy: keep at least X days worth of event-time history no matter how long the data is already stored (ie, there would only be an event-time base expiration but not wall-clock time). It could also be combined with a wall-clock time expiration: delete data only after it's at least X days old and stored for at least Y days. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] ahuang98 commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
ahuang98 commented on code in PR #12072: URL: https://github.com/apache/kafka/pull/12072#discussion_r863014438 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java: ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.stream.Collectors; +import org.apache.kafka.common.config.ConfigDef.Validator; +import org.apache.kafka.common.config.ConfigException; + +public class MetadataVersionValidator implements Validator { + +@Override +public void ensureValid(String name, Object value) { +try { +MetadataVersion.fromVersionString(value.toString()); +} catch (IllegalArgumentException e) { +throw new ConfigException(name, value.toString(), e.getMessage()); +} +} + +@Override +public String toString() { Review Comment: I'll remove the `distinct` call. I'm not sure if I follow the discussion on the `toString` location, are we saying that it should be moved? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)
mimaison commented on PR #11773: URL: https://github.com/apache/kafka/pull/11773#issuecomment-1115108546 Sorry @C0urante for the delays, we were at Kafka Summit last week and I'm still trying to catch up on stuff. I'm hoping to take another look this week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #12067: KAFKA-13780: Generate OpenAPI file for Connect REST API
mimaison commented on PR #12067: URL: https://github.com/apache/kafka/pull/12067#issuecomment-1115103573 @kkonstantine @rhauch Can you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #12107: MINOR: fix ClientQuotasRequestTest.testAlterClientQuotasBadIp
cmccabe merged PR #12107: URL: https://github.com/apache/kafka/pull/12107 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11874: Fix typos in configuration docs
C0urante commented on code in PR #11874: URL: https://github.com/apache/kafka/pull/11874#discussion_r862970744 ## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ## @@ -216,8 +216,10 @@ private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking." + " Note that if this config is set to be greater than 1 and enable.idempotence is set to false, there is a risk of" + " message re-ordering after a failed send due to retries (i.e., if retries are enabled)." -+ " Additionally, enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "." -+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."; ++ " Additionally, enabling idempotence requires the value of this configuration to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "." ++ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. " ++ " Record ordering is preserved when enable.idempotence is set to true for idempotent " ++ " producer (or transactional producer), even when max in-flight requests are greater than 1 (supported up to 5)."; Review Comment: The last sentence is redundant though, isn't it? The docs already state: > Note that if this config is set to be greater than 1 and enable.idempotence is set to false, there is a risk of message re-ordering after a failed send due to retries (i.e., if retries are enabled). It's fine if we want to clarify that re-ordering is not a risk when `enable.idempotence` is set to true, but we should also try to keep the docs here concise. What about adding a brief followup in that sentence instead? > Note that if this config is set to be greater than 1 and enable.idempotence is set to false, there is a risk of message re-ordering after a failed send due to retries (i.e., if retries are enabled); ordering will be preserved if retries are disabled and/or enable.idempotence is true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13859) SCRAM authentication issues with kafka-clients 3.0.1
[ https://issues.apache.org/jira/browse/KAFKA-13859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530796#comment-17530796 ] Oliver Payne commented on KAFKA-13859: -- Sorry for the late response. I see that this has already been marked resolved, but wanted to answer the questions I left hanging here. [~dengziming] The following exception is coming from the client logs: {code:java} org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed. at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$6(KafkaTemplate.java:690) ~[spring-kafka-2.8.5.jar:2.8.5] at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer$1.onCompletion(DefaultKafkaProducerFactory.java:1001) ~[spring-kafka-2.8.5.jar:2.8.5] at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1350) ~[kafka-clients-3.0.1.jar:na] at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273) ~[kafka-clients-3.0.1.jar:na] at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:161) ~[kafka-clients-3.0.1.jar:na] at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:773) ~[kafka-clients-3.0.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.maybeAbortBatches(Sender.java:498) ~[kafka-clients-3.0.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:307) ~[kafka-clients-3.0.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) ~[kafka-clients-3.0.1.jar:na] at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na] Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed. {code} Our broker version is 2.6.2 Here are our producer configs: {code:java} "security.protocol" -> "SASL_SSL""value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer""sasl.mechanism" -> "SCRAM-SHA-512""sasl.jaas.config" -> "org.apache.kafka.common.security.scram.ScramLoginModule required username="redacted" password="redacted";""bootstrap.servers" -> "server-name-redacted1:9096, server-name-redacted2:9096, server-name-redacted3:9096""key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer""ssl.endpoint.identification.algorithm" -> "https" {code} I also added the enable.idempotence = false per your recommendation, and it seemed to resolve the issue. Thanks for the suggestion > SCRAM authentication issues with kafka-clients 3.0.1 > > > Key: KAFKA-13859 > URL: https://issues.apache.org/jira/browse/KAFKA-13859 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.1 >Reporter: Oliver Payne >Assignee: dengziming >Priority: Major > > When attempting to produce records to Kafka using a client configured with > SCRAM authentication, the authentication is being rejected, and the following > exception is thrown: > {{org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster > authorization failed.}} > I am seeing this happen with a Springboot service that was recently upgraded > to 2.6.5. After looking into this, I learned that Springboot moved to > kafka-clients 3.0.1 from 3.0.0 in that version. And sure enough, downgrading > to kafka-clients resolved the issue, with no changes made to the configs. > I have also attempted to connect to a separate server with kafka-clients > 3.0.1, using plaintext authentication. That works fine. So the issue appears > to be with SCRAM authentication. > I will note that I am attempting to connect to an AWS MSK instance. We use > SCRAM-SHA-512 as our sasl mechanism, using the basic {{ScramLoginModule.}} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] ijuma commented on a diff in pull request #12114: MINOR: Note that slf4j-log4j in version 1.7.35+ should be used
ijuma commented on code in PR #12114: URL: https://github.com/apache/kafka/pull/12114#discussion_r862968313 ## docs/upgrade.html: ## @@ -73,7 +73,11 @@ Notable changes in 3 via Connect worker and/or connector configuration. Connect may enable idempotent producers by default in a future major release. Kafka has replaced log4j and slf4j-log4j12 with reload4j and slf4j-reload4j due to security concerns. Review Comment: We should also update the same text in the 3.1 branch once we merge this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12114: MINOR: Note that slf4j-log4j in version 1.7.35+ should be used
ijuma commented on code in PR #12114: URL: https://github.com/apache/kafka/pull/12114#discussion_r862967880 ## docs/upgrade.html: ## @@ -73,7 +73,11 @@ Notable changes in 3 via Connect worker and/or connector configuration. Connect may enable idempotent producers by default in a future major release. Kafka has replaced log4j and slf4j-log4j12 with reload4j and slf4j-reload4j due to security concerns. Review Comment: Maybe we can say something like: > Kafka has replaced log4j and slf4j-log4j12 with reload4j and slf4j-reload4j due to security concerns. > This only affects modules that specify a logging backend (`connect-runtime` and `kafka-tools` are two such > examples). A number of modules, including `kafka-clients`, leave it to the application to specify the logging > backend. More information can be found at https://reload4j.qos.ch;>reload4j. > Projects that depend on the affected modules from the Kafka project should use > https://www.slf4j.org/manual.html#swapping;>slf4j-log4j12 version 1.7.35 or above or > slf4j-reload4j to avoid > https://www.slf4j.org/codes.html#no_tlm;>possible compatibility issues originating from the logging framework. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)
C0urante commented on PR #11773: URL: https://github.com/apache/kafka/pull/11773#issuecomment-1115054516 @tombentley @mimaison I'd really like it if we could confirm the intended direction for this API. I'm willing to go whichever direction you believe is best, but (as Tom has noted) given that this is fairly green-field for Connect, I want to make sure that we consider our options carefully and set a good precedent for future APIs like this. If you believe we've done our due diligence, I'm happy to implement whatever approach you believe is best; please just confirm what you would like to see here if there are still any reservations about the current state of the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530788#comment-17530788 ] Bruno Cadonna commented on KAFKA-13864: --- [~frosiere] When you write a KIP you need to follow the process described in the [KIP doc|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] under section "Process". Otherwise we will lose track of the KIP and nobody will ever read your KIP and approve it. I incremented the "Next KIP Number" for you. Now you need to add your KIP at the end of the table in section "KIPs under discussion". Then you need to start a [DISCUSSION] thread on the dev mailing list. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > Attachments: interceptor_constructor_client.patch > > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] cadonna commented on pull request #12114: MINOR: Note that slf4j-log4j in version 1.7.35+ should be used
cadonna commented on PR #12114: URL: https://github.com/apache/kafka/pull/12114#issuecomment-1115025366 @ijuma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request, #12114: MINOR: Note that slf4j-log4j in version 1.7.35+ should be used
cadonna opened a new pull request, #12114: URL: https://github.com/apache/kafka/pull/12114 Adds a note to the upgrade notes to use slf4j-log4j version 1.7.35+ [1] or slf4j-reload4j to avoid possible compatibility issues originating from the logging framework [2]. [1] https://www.slf4j.org/manual.html#swapping [2] https://www.slf4j.org/codes.html#no_tlm ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12019: KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing
YeonCheolGit commented on code in PR #12019: URL: https://github.com/apache/kafka/pull/12019#discussion_r862388033 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -110,14 +114,18 @@ public Map performAssignment(String leaderId, String protoco : CONNECT_PROTOCOL_V1; Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator); +Map assignments; if (leaderOffset == null) { -Map assignments = fillAssignments( +assignments = fillAssignments( memberConfigs.keySet(), Assignment.CONFIG_MISMATCH, -leaderId, memberConfigs.get(leaderId).url(), maxOffset, Collections.emptyMap(), -Collections.emptyMap(), Collections.emptyMap(), 0, protocolVersion); -return serializeAssignments(assignments); +leaderId, memberConfigs.get(leaderId).url(), maxOffset, +ClusterAssignment.EMPTY, 0, protocolVersion); +} else { +assignments = performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator, protocolVersion); } -return performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator, protocolVersion); +Map result = serializeAssignments(assignments); +log.debug("Finished assignment"); Review Comment: @C0urante This works with `Map assignment's'`. So maybe this? ```suggestion log.debug("Finished assignments"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862915029 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -34,22 +34,38 @@ */ public abstract class SampledStat implements MeasurableStat { -private double initialValue; +private final double initialValue; private int current = 0; + protected List samples; public SampledStat(double initialValue) { this.initialValue = initialValue; this.samples = new ArrayList<>(2); } +/** + * {@inheritDoc} + * + * On every record, do the following: + * 1. Check if the current window has expired + * 2. If yes, then advance the current pointer to new window. The start time of the new window is set to nearest + *possible starting point for the new window. The nearest starting point occurs at config.timeWindowMs intervals + *from the end time of last known window. + * 3. Update the recorded value for the current window + * 4. Increase the number of event count + */ @Override -public void record(MetricConfig config, double value, long timeMs) { -Sample sample = current(timeMs); -if (sample.isComplete(timeMs, config)) -sample = advance(config, timeMs); -update(sample, config, value, timeMs); -sample.eventCount += 1; +public void record(MetricConfig config, double value, long recordingTimeMs) { +Sample sample = current(recordingTimeMs); +if (sample.isComplete(recordingTimeMs, config)) { +final long previousWindowStartTime = sample.getLastWindowMs(); +sample = advance(config, recordingTimeMs); Review Comment: Modified the code to make it more readable. It is not exactly what you mentioned but should be more readable now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862909115 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -138,6 +170,46 @@ public boolean isComplete(long timeMs, MetricConfig config) { return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); } +public boolean isActive() { Review Comment: All changes to public API have been reverted. Addressing the core fix does not require any changes to the public APIs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862908720 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java: ## @@ -52,10 +51,6 @@ public Rate(TimeUnit unit, SampledStat stat) { this.unit = unit; } -public String unitName() { Review Comment: All changes to public API have been reverted. Addressing the core fix does not require any changes to the public APIs. ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -110,25 +127,40 @@ public String toString() { protected void purgeObsoleteSamples(MetricConfig config, long now) { long expireAge = config.samples() * config.timeWindowMs(); for (Sample sample : samples) { -if (now - sample.lastWindowMs >= expireAge) +if (now - sample.getLastWindowMs() >= expireAge) sample.reset(now); } } protected static class Sample { -public double initialValue; -public long eventCount; -public long lastWindowMs; -public double value; +private double initialValue; Review Comment: All changes to public API have been reverted. Addressing the core fix does not require any changes to the public APIs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862908241 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -110,25 +127,40 @@ public String toString() { protected void purgeObsoleteSamples(MetricConfig config, long now) { long expireAge = config.samples() * config.timeWindowMs(); for (Sample sample : samples) { -if (now - sample.lastWindowMs >= expireAge) +if (now - sample.getLastWindowMs() >= expireAge) sample.reset(now); } } protected static class Sample { -public double initialValue; -public long eventCount; -public long lastWindowMs; -public double value; +private double initialValue; +private long eventCount; +private long lastWindowMs; +private double value; + +/** + * A Sample object could be re-used in a ring buffer to store future samples for space efficiency. + * Thus, a sample could be in either of the following lifecycle states: + * NOT_INITIALIZED: Sample has not been initialized. + * ACTIVE: Sample has values and is currently + * RESET: Sample has been reset and the object is not destroyed so that it could be used for storing future + *samples. + */ +private enum LifecycleState { Review Comment: This code has been removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862908032 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -87,8 +103,9 @@ public Sample oldest(long now) { Sample oldest = this.samples.get(0); for (int i = 1; i < this.samples.size(); i++) { Sample curr = this.samples.get(i); -if (curr.lastWindowMs < oldest.lastWindowMs) +if ((curr.getLastWindowMs() < oldest.getLastWindowMs()) && curr.isActive()) { // only consider active samples Review Comment: I have completely got rid of these changes. They were more from useful for defensive programming but changing the public APIs would have required a KIP and also complicated this code review. I will file a separate PR to add these defensive checks back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862906685 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java: ## @@ -68,28 +63,61 @@ public double measure(MetricConfig config, long now) { } public long windowSize(MetricConfig config, long now) { -// purge old samples before we compute the window size +// Purge obsolete samples. Obsolete samples are the ones which are not relevant to the current calculation +// because their creation time is outside (before) the duration of time window used to calculate rate. stat.purgeObsoleteSamples(config, now); /* * Here we check the total amount of time elapsed since the oldest non-obsolete window. - * This give the total windowSize of the batch which is the time used for Rate computation. - * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second - * window, the measured rate will be very high. - * Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete. + * This gives the duration of computation time window which used to calculate Rate. + * + * For scenarios when rate computation is performed after at least `config.samples` have completed, + * the duration of computation time window is determined by: + * window duration = (now - start time of oldest non-obsolete window) + * + * ## Special case: First ever window + * A special scenario occurs when rate calculation is performed before at least `config.samples` have completed + * (e.g. if only 1 second has elapsed in a 30 second). In such a scenario, window duration would be equal to the + * time elapsed in the current window (since oldest non-obsolete window is current window). This leads to the + * following values for rate. Consider the following example: + * config.timeWindowMs() = 1s + * config.samples() = 2 + * Record events (E) at timestamps: + * E1 = CurrentTimeStamp (T1) + * E2 = T1 + 30ms + * E2 = T1 + 60ms Review Comment: Fixed. Thanks for point thing this out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on PR #12045: URL: https://github.com/apache/kafka/pull/12045#issuecomment-1114895474 @mimaison Thinking about it, I can actually reduce the code changes such that no modifications to any public interface is made. Do you still think a KIP is required for this change in that case? (I am new to Kafka so I am not fully sure what qualifies for a KIP vs. what doesn't) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chromy96 commented on a diff in pull request #11916: KAFKA-12703; Allow unencrypted private keys when using PEM files
chromy96 commented on code in PR #11916: URL: https://github.com/apache/kafka/pull/11916#discussion_r862846428 ## clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java: ## @@ -291,7 +289,14 @@ public void testPemKeyStoreFileNoKeyPassword() throws Exception { configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, pemFilePath(pemAsConfigValue(KEY, CERTCHAIN).value())); configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE); -assertThrows(InvalidConfigurationException.class, () -> factory.configure(configs)); +configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, null); +factory.configure(configs); + +KeyStore keyStore = factory.keystore(); +List aliases = Collections.list(keyStore.aliases()); +assertEquals(Collections.singletonList("kafka"), aliases); +assertNotNull(keyStore.getCertificate("kafka"), "Certificate not loaded"); +assertNotNull(keyStore.getKey("kafka", null), "Private key not loaded"); Review Comment: @dajac I found in two places where the mandatory key is mentioned. I've updated this in the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chromy96 commented on a diff in pull request #11916: KAFKA-12703; Allow unencrypted private keys when using PEM files
chromy96 commented on code in PR #11916: URL: https://github.com/apache/kafka/pull/11916#discussion_r862845752 ## clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java: ## @@ -291,7 +289,14 @@ public void testPemKeyStoreFileNoKeyPassword() throws Exception { configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, pemFilePath(pemAsConfigValue(KEY, CERTCHAIN).value())); configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE); -assertThrows(InvalidConfigurationException.class, () -> factory.configure(configs)); +configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, null); +factory.configure(configs); + +KeyStore keyStore = factory.keystore(); +List aliases = Collections.list(keyStore.aliases()); +assertEquals(Collections.singletonList("kafka"), aliases); +assertNotNull(keyStore.getCertificate("kafka"), "Certificate not loaded"); +assertNotNull(keyStore.getKey("kafka", null), "Private key not loaded"); Review Comment: Thanks for the comment. Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on PR #12045: URL: https://github.com/apache/kafka/pull/12045#issuecomment-1114879179 Thanks for checking @mimaison. As I explained above, the test is flaky since the logic of computation of `Rate` has a bug and hence, in worst case scenario it can exceed whatever thresholds we set on the assertion. Increasing the test threshold will be a hacky way to fix it but that is equivalent to disabling the test altogether. Per your suggestion, let me start a discussion on the mailing list and create a KIP if we reach a consensus. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
mimaison commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862834421 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -110,25 +127,40 @@ public String toString() { protected void purgeObsoleteSamples(MetricConfig config, long now) { long expireAge = config.samples() * config.timeWindowMs(); for (Sample sample : samples) { -if (now - sample.lastWindowMs >= expireAge) +if (now - sample.getLastWindowMs() >= expireAge) sample.reset(now); } } protected static class Sample { -public double initialValue; -public long eventCount; -public long lastWindowMs; -public double value; +private double initialValue; Review Comment: This is also part of the public API, so we shouldn't be changing these fields ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -138,6 +170,46 @@ public boolean isComplete(long timeMs, MetricConfig config) { return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); } +public boolean isActive() { Review Comment: Again because it's public API we can't add new public methods without having a KIP ## clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java: ## @@ -52,10 +51,6 @@ public Rate(TimeUnit unit, SampledStat stat) { this.unit = unit; } -public String unitName() { Review Comment: `Rate` is part of the public API (https://kafka.apache.org/31/javadoc/org/apache/kafka/common/metrics/stats/Rate.html) so we don't want to remove this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
Hangleton commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862786317 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -87,8 +103,9 @@ public Sample oldest(long now) { Sample oldest = this.samples.get(0); for (int i = 1; i < this.samples.size(); i++) { Sample curr = this.samples.get(i); -if (curr.lastWindowMs < oldest.lastWindowMs) +if ((curr.getLastWindowMs() < oldest.getLastWindowMs()) && curr.isActive()) { // only consider active samples Review Comment: Is the `isActive` really required? Before the oldest sample is computed, expired samples are reset which brings the `lastWindonMs` equal to `now`. Marginal note: this assumes at least one sample is active (that is if all samples between 1 and `samples.size()` are not active, the first sample in the list has to be active and would be the current sample). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530722#comment-17530722 ] François Rosière edited comment on KAFKA-13864 at 5/2/22 12:52 PM: --- [KIP-832|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882578] has been created. was (Author: JIRAUSER288866): KIP-832 has been created. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > Attachments: interceptor_constructor_client.patch > > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530722#comment-17530722 ] François Rosière commented on KAFKA-13864: -- KIP-832 has been created. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > Attachments: interceptor_constructor_client.patch > > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530716#comment-17530716 ] Bruno Cadonna commented on KAFKA-13864: --- [~frosiere] You should be all set now! > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > Attachments: interceptor_constructor_client.patch > > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530707#comment-17530707 ] François Rosière commented on KAFKA-13864: -- Make sense... frosiere is my account name on both Jira and Confluence. Thanks > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > Attachments: interceptor_constructor_client.patch > > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] Hangleton commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
Hangleton commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862786317 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -87,8 +103,9 @@ public Sample oldest(long now) { Sample oldest = this.samples.get(0); for (int i = 1; i < this.samples.size(); i++) { Sample curr = this.samples.get(i); -if (curr.lastWindowMs < oldest.lastWindowMs) +if ((curr.getLastWindowMs() < oldest.getLastWindowMs()) && curr.isActive()) { // only consider active samples Review Comment: Does the `isActive` really required? Before the oldest sample is computed, expired samples are reset which brings the `lastWindonMs` equal to `now`. Marginal note: this assumes at least one sample is active (that is if all samples between 1 and `samples.size()` are not active, the first sample in the list has to be active and would be the current sample). ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -110,25 +127,40 @@ public String toString() { protected void purgeObsoleteSamples(MetricConfig config, long now) { long expireAge = config.samples() * config.timeWindowMs(); for (Sample sample : samples) { -if (now - sample.lastWindowMs >= expireAge) +if (now - sample.getLastWindowMs() >= expireAge) sample.reset(now); } } protected static class Sample { -public double initialValue; -public long eventCount; -public long lastWindowMs; -public double value; +private double initialValue; +private long eventCount; +private long lastWindowMs; +private double value; + +/** + * A Sample object could be re-used in a ring buffer to store future samples for space efficiency. + * Thus, a sample could be in either of the following lifecycle states: + * NOT_INITIALIZED: Sample has not been initialized. + * ACTIVE: Sample has values and is currently + * RESET: Sample has been reset and the object is not destroyed so that it could be used for storing future + *samples. + */ +private enum LifecycleState { Review Comment: Cf. comment on calculation of the oldest sample - not sure this is required given `reset` update the `lastWindowMs` of the sample to the "current" timestamp at the time of the reset. ## clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java: ## @@ -68,28 +63,61 @@ public double measure(MetricConfig config, long now) { } public long windowSize(MetricConfig config, long now) { -// purge old samples before we compute the window size +// Purge obsolete samples. Obsolete samples are the ones which are not relevant to the current calculation +// because their creation time is outside (before) the duration of time window used to calculate rate. stat.purgeObsoleteSamples(config, now); /* * Here we check the total amount of time elapsed since the oldest non-obsolete window. - * This give the total windowSize of the batch which is the time used for Rate computation. - * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second - * window, the measured rate will be very high. - * Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete. + * This gives the duration of computation time window which used to calculate Rate. + * + * For scenarios when rate computation is performed after at least `config.samples` have completed, + * the duration of computation time window is determined by: + * window duration = (now - start time of oldest non-obsolete window) + * + * ## Special case: First ever window + * A special scenario occurs when rate calculation is performed before at least `config.samples` have completed + * (e.g. if only 1 second has elapsed in a 30 second). In such a scenario, window duration would be equal to the + * time elapsed in the current window (since oldest non-obsolete window is current window). This leads to the + * following values for rate. Consider the following example: + * config.timeWindowMs() = 1s + * config.samples() = 2 + * Record events (E) at timestamps: + * E1 = CurrentTimeStamp (T1) + * E2 = T1 + 30ms + * E2 = T1 + 60ms Review Comment: E2 -> E3 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -34,22 +34,38 @@ */ public abstract class SampledStat implements MeasurableStat { -
[jira] [Comment Edited] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530703#comment-17530703 ] Bruno Cadonna edited comment on KAFKA-13864 at 5/2/22 12:06 PM: Any change that impacts the public interface of a class for which the build generates Javadocs is considered a major change and needs a KIP. The [KIP doc|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] is quite explicit about it. Regarding the account: Let me know your account name and I can give you the needed permissions. was (Author: cadonna): Any change that impacts the public interface of a class for which the build generates Javadocs are considered a major change and needs a KIP. The [KIP doc|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] is quite explicit about it. Regarding the account: Let me know your account name and I can give you the needed permissions. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > Attachments: interceptor_constructor_client.patch > > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530703#comment-17530703 ] Bruno Cadonna commented on KAFKA-13864: --- Any change that impacts the public interface of a class for which the build generates Javadocs are considered a major change and needs a KIP. The [KIP doc|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] is quite explicit about it. Regarding the account: Let me know your account name and I can give you the needed permissions. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > Attachments: interceptor_constructor_client.patch > > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Comment Edited] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530697#comment-17530697 ] François Rosière edited comment on KAFKA-13864 at 5/2/22 11:59 AM: --- KIP looks overkill in this specific case as we simply create a new constructor + increase the visibility of one existing constructor, no impact on existing API and the usage is still the exact same. But I can create one when I will have the right setup/accounts. was (Author: JIRAUSER288866): KIP looks overkill in this specific case as we simply create a new constructor + increase the visibility of one existing constructor, no impact on existing API and the usage is still the exact same. But let me create it... > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > Attachments: interceptor_constructor_client.patch > > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530697#comment-17530697 ] François Rosière commented on KAFKA-13864: -- KIP looks overkill in this specific case as we simply create a new constructor + increase the visibility of one existing constructor, no impact on existing API and the usage is still the exact same. But let me create it... > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > Attachments: interceptor_constructor_client.patch > > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530694#comment-17530694 ] Bruno Cadonna commented on KAFKA-13864: --- [~Jack-Lee] Could you please open a GitHub PR against trunk? Reviewing a PR is simpler than an attached patch. The title of the PR should start with "KAFKA-13864:". We still need a KIP before we can merge the PR. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > Attachments: interceptor_constructor_client.patch > > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530689#comment-17530689 ] lqjacklee commented on KAFKA-13864: --- [~frosiere] [~cadonna] please help review the patch. Once we need the KIP , could I take the task? > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > Attachments: interceptor_constructor_client.patch > > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lqjacklee updated KAFKA-13864: -- Attachment: interceptor_constructor_client.patch > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > Attachments: interceptor_constructor_client.patch > > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] divijvaidya commented on pull request #12113: MINOR: Small cleanups in connect/mirror
divijvaidya commented on PR #12113: URL: https://github.com/apache/kafka/pull/12113#issuecomment-1114739908 Hey @mimaison, one of the flaky tests failing for this PR is `testListenerConnectionRateLimitWhenActualRateAboveLimit() – kafka.network.ConnectionQuotasTest` which I have fixed as part of https://github.com/apache/kafka/pull/12045 If you get a chance, please review my PR and that will reduce some degree of flakiness from the existing test suite. Thanks in advance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530677#comment-17530677 ] Bruno Cadonna commented on KAFKA-13864: --- [~frosiere][~Jack-Lee] I think this ticket needs a KIP since it plans to change the public API. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12106: KAFKA-13861: Fix the validateOnly behavior for CreatePartitions requests in KRaft mode
divijvaidya commented on code in PR #12106: URL: https://github.com/apache/kafka/pull/12106#discussion_r862747446 ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -767,7 +767,7 @@ class ControllerApis(val requestChannel: RequestChannel, setErrorCode(TOPIC_AUTHORIZATION_FAILED.code)) } } -controller.createPartitions(context, topics).thenApply { results => +controller.createPartitions(context, topics, request.validateOnly()).thenApply { results => Review Comment: nit you don't need the parenthesis in scala here. Simply `request.validateOnly` would work. ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1664,13 +1664,14 @@ public CompletableFuture updateFeatures( @Override public CompletableFuture> createPartitions( Review Comment: You can handle the `validateOnly` here and downstream function calls do not have to know about it. This would greatly simplify the code because now `ReplicationControl` does not have to deal with parsing the result and handling the `validateOnly` flag e.g. this function implementation would change to ``` if (topics.isEmpty()) { return CompletableFuture.completedFuture(Collections.emptyList()); } return appendWriteEvent("createPartitions", context.deadlineNs(), () -> { final ControllerResult> result = replicationControl.createPartitions(topics); return validateOnly ? result.withoutRecords() : result; }); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-13864: -- Labels: needs-kip (was: ) > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > Labels: needs-kip > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] François Rosière updated KAFKA-13864: - Description: To allow implementing Spring managed interceptors for producers and consumers, [https://github.com/spring-projects/spring-kafka/issues/2244] a new constructor should be added in KafkaProducer {code:java} public KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer){code} the visibility of one constructor of KafkaConsumer should also move from default to public. {code:java} public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) {code} see the current implementation https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 This issue is quite blocking , so, any other alternative or proposal would be more than welcome. Kafka streams is not concerned by this issue as the KafkaStreams object is already exposing a constructor taking a StreamsConfig object. Thanks for considering this issue. was: To allow implementing Spring managed interceptors for producers and consumers, [https://github.com/spring-projects/spring-kafka/issues/2244] a new constructor should be added in KafkaProducer {code:java} public KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer){code} the visibility of one constructor of KafkaConsumer should also move from default to public. {code:java} public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) {code} see the current implementation [here|[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671]] This issue is quite blocking , so, any other alternative or proposal would be more than welcome. Kafka streams is not concerned by this issue as the KafkaStreams object is already exposing a constructor taking a StreamsConfig object. Thanks for considering this issue. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671 > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] François Rosière updated KAFKA-13864: - Description: To allow implementing Spring managed interceptors for producers and consumers, [https://github.com/spring-projects/spring-kafka/issues/2244] a new constructor should be added in KafkaProducer {code:java} public KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer){code} the visibility of one constructor of KafkaConsumer should also move from default to public. {code:java} public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) {code} see the current implementation [here|[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671]] This issue is quite blocking , so, any other alternative or proposal would be more than welcome. Kafka streams is not concerned by this issue as the KafkaStreams object is already exposing a constructor taking a StreamsConfig object. Thanks for considering this issue. was: To allow implementing Spring managed interceptors for producers and consumers, [https://github.com/spring-projects/spring-kafka/issues/2244] a new constructor should be added in KafkaProducer {code:java} public KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer){code} the visibility of one constructor of KafkaConsumer should also move from default to public. {code:java} public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) {code} see the current implementation [here|#L321]. This issue is quite blocking , so, any other alternative or proposal would be more than welcome. Kafka streams is not concerned by this issue as the KafkaStreams object is already exposing a constructor taking a StreamsConfig object. Thanks for considering this issue. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation > [here|[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671]] > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lqjacklee reassigned KAFKA-13864: - Assignee: lqjacklee > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Assignee: lqjacklee >Priority: Major > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation [here|#L321]. > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] François Rosière updated KAFKA-13864: - Description: To allow implementing Spring managed interceptors for producers and consumers, [https://github.com/spring-projects/spring-kafka/issues/2244] a new constructor should be added in KafkaProducer {code:java} public KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer){code} the visibility of one constructor of KafkaConsumer should also move from default to public. {code:java} public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) {code} see the current implementation [here|#L321]. This issue is quite blocking , so, any other alternative or proposal would be more than welcome. Kafka streams is not concerned by this issue as the KafkaStreams object is already exposing a constructor taking a StreamsConfig object. Thanks for considering this issue. was: To allow implementing Spring managed interceptors for producers and consumers, [https://github.com/spring-projects/spring-kafka/issues/2244] a new constructor should be added in KafkaProducer {code:java} public KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer){code} the visibility of one constructor of KafkaConsumer should also move from default to public. {code:java} public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) {code} see the current implementation [here|#L321]]. This issue is quite blocking , so, any other alternative or proposal would be more than welcome. Kafka streams is not concerned by this issue as the KafkaStreams object is already exposing a constructor taking a StreamsConfig object. Thanks for considering this issue. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Priority: Major > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation [here|#L321]. > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] François Rosière updated KAFKA-13864: - Description: To allow implementing Spring managed interceptors for producers and consumers, [https://github.com/spring-projects/spring-kafka/issues/2244] a new constructor should be added in KafkaProducer {code:java} public KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer){code} the visibility of one constructor of KafkaConsumer should also move from default to public. {code:java} public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) {code} see the current implementation [here|[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L321]]. This issue is quite blocking , so, any other alternative or proposal would be more than welcome. Kafka streams is not concerned by this issue as the KafkaStreams object is already exposing a constructor taking a StreamsConfig object. Thanks for considering this issue. was: To allow implementing Spring managed interceptors for producers and consumers, [https://github.com/spring-projects/spring-kafka/issues/2244] the visibility of one constructor of KafkaProducer and KafkaConsumer should move from the default modifier to the public modifier. See [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L321] and [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671] For the KafkaProducer, it may make more sense to define an additional constructor {code:java} public KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer){code} This issue is quite blocking , so, any other alternative or proposal would be more than welcome. Kafka streams is not concerned by this issue as the KafkaStreams object is already exposing a constructor taking a StreamsConfig object. Thanks for considering this issue. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Priority: Major > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > > the visibility of one constructor of KafkaConsumer should also move from > default to public. > > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > > see the current implementation > [here|[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L321]]. > > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] François Rosière updated KAFKA-13864: - Description: To allow implementing Spring managed interceptors for producers and consumers, [https://github.com/spring-projects/spring-kafka/issues/2244] a new constructor should be added in KafkaProducer {code:java} public KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer){code} the visibility of one constructor of KafkaConsumer should also move from default to public. {code:java} public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) {code} see the current implementation [here|#L321]]. This issue is quite blocking , so, any other alternative or proposal would be more than welcome. Kafka streams is not concerned by this issue as the KafkaStreams object is already exposing a constructor taking a StreamsConfig object. Thanks for considering this issue. was: To allow implementing Spring managed interceptors for producers and consumers, [https://github.com/spring-projects/spring-kafka/issues/2244] a new constructor should be added in KafkaProducer {code:java} public KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer){code} the visibility of one constructor of KafkaConsumer should also move from default to public. {code:java} public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) {code} see the current implementation [here|[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L321]]. This issue is quite blocking , so, any other alternative or proposal would be more than welcome. Kafka streams is not concerned by this issue as the KafkaStreams object is already exposing a constructor taking a StreamsConfig object. Thanks for considering this issue. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Priority: Major > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > a new constructor should be added in KafkaProducer > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > the visibility of one constructor of KafkaConsumer should also move from > default to public. > {code:java} > public KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, > Deserializer valueDeserializer) {code} > see the current implementation [here|#L321]]. > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13864) Change the visibility of a KafkaProducer and KafkaConsumer constructor
[ https://issues.apache.org/jira/browse/KAFKA-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] François Rosière updated KAFKA-13864: - Description: To allow implementing Spring managed interceptors for producers and consumers, [https://github.com/spring-projects/spring-kafka/issues/2244] the visibility of one constructor of KafkaProducer and KafkaConsumer should move from the default modifier to the public modifier. See [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L321] and [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671] For the KafkaProducer, it may make more sense to define an additional constructor {code:java} public KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer){code} This issue is quite blocking , so, any other alternative or proposal would be more than welcome. Kafka streams is not concerned by this issue as the KafkaStreams object is already exposing a constructor taking a StreamsConfig object. Thanks for considering this issue. was: To allow implementing Spring managed interceptors for producers and consumers, [https://github.com/spring-projects/spring-kafka/issues/2244] the visibility of one constructor of KafkaProducer and KafkaConsumer should move from the default modifier to the public modifier. See [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L321] and [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671] For the KafkaProducer, it may make more sense to expose the following constructor {code:java} public KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer){code} Any other alternative or proposal would be more than welcome. Kafka streams are not concerned by this issues as the KafkaStreams object is already exposing a constructor taking a StreamsConfig object. Thanks for considering this issue. > Change the visibility of a KafkaProducer and KafkaConsumer constructor > -- > > Key: KAFKA-13864 > URL: https://issues.apache.org/jira/browse/KAFKA-13864 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.0 >Reporter: François Rosière >Priority: Major > > To allow implementing Spring managed interceptors for producers and consumers, > [https://github.com/spring-projects/spring-kafka/issues/2244] > the visibility of one constructor of KafkaProducer and KafkaConsumer should > move from the default modifier to the public modifier. > See > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L321] > and > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L671] > For the KafkaProducer, it may make more sense to define an additional > constructor > {code:java} > public KafkaProducer(ProducerConfig config, Serializer keySerializer, > Serializer valueSerializer){code} > This issue is quite blocking , so, any other alternative or proposal would be > more than welcome. > Kafka streams is not concerned by this issue as the KafkaStreams object is > already exposing a constructor taking a StreamsConfig object. > Thanks for considering this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12112: MINOR: Fix flaky testDescribeUnderReplicatedPartitions
divijvaidya commented on code in PR #12112: URL: https://github.com/apache/kafka/pull/12112#discussion_r862714992 ## core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala: ## @@ -586,11 +586,14 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi try { killBroker(0) val aliveServers = brokers.filterNot(_.config.brokerId == 0) - TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0) + TestUtils.waitUntilTrue( Review Comment: Can we alternatively use one of the existing methods in TestUtils to validate that the topic partition ISR contains rest of the 5 brokers e.g. using `TestUtils.waitForBrokersInIsr` could validate that the topic partition metadata exists in expected number of Isr even after one of the brokers is terminated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13865) Fix ResponseSendTimeMs metric in RequestChannel.scala was removed repeatedly
[ https://issues.apache.org/jira/browse/KAFKA-13865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-13865. --- Fix Version/s: 3.3.0 Resolution: Fixed > Fix ResponseSendTimeMs metric in RequestChannel.scala was removed > repeatedly > --- > > Key: KAFKA-13865 > URL: https://issues.apache.org/jira/browse/KAFKA-13865 > Project: Kafka > Issue Type: Bug >Reporter: zhaobo >Priority: Minor > Fix For: 3.3.0 > > > ResponseSendTimeMs metric was removed in line 576,but we removed it again in > line 578. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] showuon merged pull request #12111: KAFKA-13865: Fix ResponseSendTimeMs metric in RequestChannel.scala was removed repeatedly
showuon merged PR #12111: URL: https://github.com/apache/kafka/pull/12111 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #11844: KAFKA#13702 - Connect RestClient overrides response status code on request failure
mimaison commented on PR #11844: URL: https://github.com/apache/kafka/pull/11844#issuecomment-1114635179 @Corlobin I agree with @C0urante, it would be good to have a test for this. Are you interested in trying the approach that has been suggested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request, #12113: MINOR: Small cleanups in connect/mirror
mimaison opened a new pull request, #12113: URL: https://github.com/apache/kafka/pull/12113 - Make a few fields `final` - Remove unnecessary `throws` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13773) Data loss after recovery from crash due to full hard disk
[ https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-13773: - Assignee: Luke Chen > Data loss after recovery from crash due to full hard disk > - > > Key: KAFKA-13773 > URL: https://issues.apache.org/jira/browse/KAFKA-13773 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 2.8.0, 3.1.0, 2.8.1 >Reporter: Tim Alkemade >Assignee: Luke Chen >Priority: Critical > Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, > kafka-2.8.0-crash.zip, kafka-logfiles.zip, kafka-start-to-finish.zip > > > While doing some testing of Kafka on Kubernetes, the data disk for kafka > filled up, which led to all 3 nodes crashing. I increased the disk size for > all three nodes and started up kafka again (one by one, waiting for the > previous node to become available before starting the next one). After a > little while two out of three nodes had no data anymore. > According to the logs, the log cleaner kicked in and decided that the latest > timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older > than the 2 week limit specified on the topic. > > {code:java} > 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, > dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files > LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, > largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0] > 2022-03-28 12:17:19,753 INFO Deleted log > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0] > 2022-03-28 12:17:19,754 INFO Deleted offset index > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0] > 2022-03-28 12:17:19,754 INFO Deleted time index > /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted. > (kafka.log.LogSegment) [kafka-scheduler-0]{code} > Using kafka-dump-log.sh I was able to determine that the greatest timestamp > in that file (before deletion) was actually 1648460888636 ( 2022-03-28, > 09:48:08 UTC, which is today). However since this segment was the > 'latest/current' segment much of the file is empty. The code that determines > the last entry (TimeIndex.lastEntryFromIndexFile) doesn't seem to know this > and just read the last position in the file, the file being mostly empty > causes it to read 0 for that position. > The cleaner code seems to take this into account since > UnifiedLog.deleteOldSegments is never supposed to delete the current segment, > judging by the scaladoc, however in this case the check doesn't seem to do > its job. Perhaps the detected highWatermark is wrong? > I've attached the logs and the zipped data directories (data files are over > 3Gb in size when unzipped) > > I've encountered this problem with both kafka 2.8.1 and 3.1.0. > I've also tried changing min.insync.replicas to 2: The issue still occurs. -- This message was sent by Atlassian Jira (v8.20.7#820007)