[jira] [Updated] (KAFKA-14000) Kafka-connect standby server shows empty tasks list
[ https://issues.apache.org/jira/browse/KAFKA-14000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xinyu Zou updated KAFKA-14000: -- Description: I'm using Kafka-connect distributed mode. There're two servers. One active and one standby. The standby server sometimes shows empty tasks list in status rest API response. curl host:8443/connectors/name1/status {code:java} { "connector": { "state": "RUNNING", "worker_id": "1.2.3.4:10443" }, "name": "name1", "tasks": [], "type": "source" } {code} I enabled TRACE log and checked. As required, the connect-status topic is set to cleanup.policy=compact. But messages in the topic won't be compacted timely. They will be compacted in a specific interval. So usually there're more than one messages with same key. E.g. When kafka-connect is launched there's no connector running. And then we start a new connector. Then there will be two messages in connect-status topic: status-task-name1 : state=RUNNING, workerId='10.251.170.166:10443', generation=100 status-task-name1 : __ Please check the log file [^kafka-connect-trace.log]. We can see that the tasks status was removed finally. But actually the empty status was not the newest message in topic connect-status. When reading status from connect-status topic, it doesn't sort messages by generation. [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java] So I think this could be improved. We can either sort the messages after poll or compare generation value before we choose correct status message. was: I'm using Kafka-connect distributed mode. There're two servers. One active and one standby. The standby server sometimes shows empty tasks list in status rest API response. curl host:8443/connectors/name1/status {code:java} { "connector": { "state": "RUNNING", "worker_id": "1.2.3.4:10443" }, "name": "name1", "tasks": [], "type": "source" } {code} I enabled TRACE log and checked. As required, the connect-status topic is set to cleanup.policy=compact. But messages in the topic won't be compacted timely. They will be compacted in a specific interval. So usually there're more than one messages with same key. E.g. When kafka-connect is launched there's no connector running. And then we start a new connector. Then there will be two messages in connect-status topic: status-task-name1 : state=RUNNING, workerId='10.251.170.166:10443', generation=100 status-task-name1 : __ When reading status from connect-status topic, it doesn't sort messages by generation. [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java] So I think this could be improved. We can either sort the messages after poll or compare generation value before we choose correct status message. > Kafka-connect standby server shows empty tasks list > --- > > Key: KAFKA-14000 > URL: https://issues.apache.org/jira/browse/KAFKA-14000 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.6.0 >Reporter: Xinyu Zou >Priority: Major > Attachments: kafka-connect-trace.log > > > I'm using Kafka-connect distributed mode. There're two servers. One active > and one standby. The standby server sometimes shows empty tasks list in > status rest API response. > curl host:8443/connectors/name1/status > {code:java} > { > "connector": { > "state": "RUNNING", > "worker_id": "1.2.3.4:10443" > }, > "name": "name1", > "tasks": [], > "type": "source" > } {code} > I enabled TRACE log and checked. As required, the connect-status topic is set > to cleanup.policy=compact. But messages in the topic won't be compacted > timely. They will be compacted in a specific interval. So usually there're > more than one messages with same key. E.g. When kafka-connect is launched > there's no connector running. And then we start a new connector. Then there > will be two messages in connect-status topic: > status-task-name1 : state=RUNNING, workerId='10.251.170.166:10443', > generation=100 > status-task-name1 : __ > Please check the log file [^kafka-connect-trace.log]. We can see that the > tasks status was removed finally. But actually the empty status was not the > newest message in topic connect-status. > > When reading status from connect-status topic, it doesn't sort messages by > generation. > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java] > So I think this could be improved. We can either sort the messages after poll > or compare generation value before we choose correct status message. -- This message was
[jira] [Updated] (KAFKA-14000) Kafka-connect standby server shows empty tasks list
[ https://issues.apache.org/jira/browse/KAFKA-14000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xinyu Zou updated KAFKA-14000: -- Attachment: kafka-connect-trace.log > Kafka-connect standby server shows empty tasks list > --- > > Key: KAFKA-14000 > URL: https://issues.apache.org/jira/browse/KAFKA-14000 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.6.0 >Reporter: Xinyu Zou >Priority: Major > Attachments: kafka-connect-trace.log > > > I'm using Kafka-connect distributed mode. There're two servers. One active > and one standby. The standby server sometimes shows empty tasks list in > status rest API response. > curl host:8443/connectors/name1/status > {code:java} > { > "connector": { > "state": "RUNNING", > "worker_id": "1.2.3.4:10443" > }, > "name": "name1", > "tasks": [], > "type": "source" > } {code} > I enabled TRACE log and checked. As required, the connect-status topic is set > to cleanup.policy=compact. But messages in the topic won't be compacted > timely. They will be compacted in a specific interval. So usually there're > more than one messages with same key. E.g. When kafka-connect is launched > there's no connector running. And then we start a new connector. Then there > will be two messages in connect-status topic: > status-task-name1 : state=RUNNING, workerId='10.251.170.166:10443', > generation=100 > status-task-name1 : __ > > When reading status from connect-status topic, it doesn't sort messages by > generation. > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java] > So I think this could be improved. We can either sort the messages after poll > or compare generation value before we choose correct status message. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-14000) Kafka-connect standby server shows empty tasks list
Xinyu Zou created KAFKA-14000: - Summary: Kafka-connect standby server shows empty tasks list Key: KAFKA-14000 URL: https://issues.apache.org/jira/browse/KAFKA-14000 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.6.0 Reporter: Xinyu Zou I'm using Kafka-connect distributed mode. There're two servers. One active and one standby. The standby server sometimes shows empty tasks list in status rest API response. curl host:8443/connectors/name1/status {code:java} { "connector": { "state": "RUNNING", "worker_id": "1.2.3.4:10443" }, "name": "name1", "tasks": [], "type": "source" } {code} I enabled TRACE log and checked. As required, the connect-status topic is set to cleanup.policy=compact. But messages in the topic won't be compacted timely. They will be compacted in a specific interval. So usually there're more than one messages with same key. E.g. When kafka-connect is launched there's no connector running. And then we start a new connector. Then there will be two messages in connect-status topic: status-task-name1 : state=RUNNING, workerId='10.251.170.166:10443', generation=100 status-task-name1 : __ When reading status from connect-status topic, it doesn't sort messages by generation. [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java] So I think this could be improved. We can either sort the messages after poll or compare generation value before we choose correct status message. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest
[ https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554884#comment-17554884 ] hudeqi commented on KAFKA-12478: Hello, Guozhang. I have started a vote on KIP-842 for this issue. Does the status of this issue also need to be changed synchronously? In addition, please check and vote on this vote, thank you. cc @ [~showuon] > Consumer group may lose data for newly expanded partitions when add > partitions for topic if the group is set to consume from the latest > --- > > Key: KAFKA-12478 > URL: https://issues.apache.org/jira/browse/KAFKA-12478 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.1.1 >Reporter: hudeqi >Priority: Blocker > Labels: patch > Attachments: safe-console-consumer.png, safe-consume.png, > safe-produce.png, trunk-console-consumer.png, trunk-consume.png, > trunk-produce.png > > Original Estimate: 1,158h > Remaining Estimate: 1,158h > > This problem is exposed in our product environment: a topic is used to > produce monitoring data. *After expanding partitions, the consumer side of > the business reported that the data is lost.* > After preliminary investigation, the lost data is all concentrated in the > newly expanded partitions. The reason is: when the server expands, the > producer firstly perceives the expansion, and some data is written in the > newly expanded partitions. But the consumer group perceives the expansion > later, after the rebalance is completed, the newly expanded partitions will > be consumed from the latest if it is set to consume from the latest. Within a > period of time, the data of the newly expanded partitions is skipped and lost > by the consumer. > If it is not necessarily set to consume from the earliest for a huge data > flow topic when starts up, this will make the group consume historical data > from the broker crazily, which will affect the performance of brokers to a > certain extent. Therefore, *it is necessary to consume these partitions from > the earliest separately.* > > I did a test and the result is as attached screenshot. Firstly, set by > producer and consumer "metadata.max.age.ms" are 500ms and 3ms > respectively. > _trunk-console-consumer.png_ means to use the community version to start the > consumer and set "latest". > _trunk-produce.png_ means the data produced, "partition_count" means the > number of partitions of the current topic, "message" means the digital > content of the corresponding message, "send_to_partition_index" Indicates the > index of the partition to which the corresponding message is sent. It can be > seen that at 11:32:10, the producer perceives the expansion of the total > partitions from 2 to 3, and writes the numbers 38, 41, and 44 into the newly > expanded partition 2. > _trunk-consume.png_ represents all the digital content consumed by the > community version. You can see that 38 and 41 sent to partition 2 were not > consumed at the beginning. Finally, after partition 2 was perceived, 38 and > 41 were still not consumed. Instead, it has been consumed from the latest 44, > so the two data of 38 and 41 are discarded. > > _safe-console-consumer.png_ means to use the fixed version to start the > consumer and set "safe_latest". > _safe-produce.png_ means the data produced. It can be seen that at 12:12:09, > the producer perceives the expansion of the total partitions from 4 to 5, and > writes the numbers 109 and 114 into the newly expanded partition 4. > _safe-consume.png_ represents all the digital content consumed by the fixed > version. You can see that 109 sent to partition 4 were not consumed at the > beginning. Finally, after partition 4 was perceived,109 was consumed as the > first data of partition 4. So the fixed version will not cause consumption to > lose data under this condition. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] C0urante commented on pull request #11781: KAFKA-10000: Per-connector offsets topics (KIP-618)
C0urante commented on PR #11781: URL: https://github.com/apache/kafka/pull/11781#issuecomment-1157162231 Hey guys--thanks for the reviews, really appreciate the rapid responses here. I found a bug that's been a bit trickier to solve than expected and have had little time to work on it this week. I plan to push the next draft by Friday at the very latest. If it matters, the bug is that the offset stores for regular (non-exactly-once) source tasks, and source connectors, are never started. I'm planning on fixing that first, then adding an integration test case to https://github.com/apache/kafka/pull/11782 to simulate a soft downgrade where someone disables exactly-once support on their worker after creating a connector and letting it run for a bit, and finally, manually auditing the changes for KIP-618 to catch any other potential bugs related to improper initialization or cleanup of resources. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bozhao12 commented on pull request #12286: KAFKA-13984: Fix TopicBasedRemoteLogMetadataManager#initializeResources should exit immediately when partition size of __remote_log_metadata
bozhao12 commented on PR #12286: URL: https://github.com/apache/kafka/pull/12286#issuecomment-1157064520 @divijvaidya Thanks for your review. I add a unit test based on your suggestion. Due to the restart operation involved, I put this unit test in `TopicBasedRemoteLogMetadataManagerRestartTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13999) Add ProducerCount metrics (KIP-847)
Artem Livshits created KAFKA-13999: -- Summary: Add ProducerCount metrics (KIP-847) Key: KAFKA-13999 URL: https://issues.apache.org/jira/browse/KAFKA-13999 Project: Kafka Issue Type: Improvement Reporter: Artem Livshits See https://cwiki.apache.org/confluence/display/KAFKA/KIP-847%3A+Add+ProducerCount+metrics -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] mdedetrich commented on pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16
mdedetrich commented on PR #12284: URL: https://github.com/apache/kafka/pull/12284#issuecomment-1156984238 > Is there something you're looking for in 2.12.16? So I already had a detailed look into this, it only occurs at compile time (see https://github.com/scala/bug/issues/12605#issuecomment-1151427077) which means that unless you get that specifically mentioned compile error there is no adverse effect. You can see the precise details with the linked ticket at https://github.com/scala/bug/issues/12605 but in summary Kafka is completely unaffected by this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16
ijuma commented on PR #12284: URL: https://github.com/apache/kafka/pull/12284#issuecomment-1156981130 I notice there's a regression: > Scala 2.12.16 contains a https://github.com/scala/bug/issues/12605 that was discovered after the artifacts were published. Only mixed compilation of Scala and Java source files together is affected, and only when the Scala code contains references to certain nested classes in the Java sources. The problem manifests as a compile-time type error. Follow link for details and workarounds. We'll fix the problem in Scala 2.12.17 which we expect to release in a few months. Is there something you're looking for in 2.12.16? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12241: MINOR: Fix docs in upgrade.html
ijuma commented on code in PR #12241: URL: https://github.com/apache/kafka/pull/12241#discussion_r898459208 ## docs/upgrade.html: ## @@ -1265,7 +1265,7 @@ Notable changes in 1 on live log directories even if there are offline log directories. A log directory may become offline due to IOException caused by hardware failure. Users need to monitor the per-broker metric offlineLogDirectoryCount to check whether there is offline log directory. -Added KafkaStorageException which is a retriable exception. KafkaStorageException will be converted to NotLeaderForPartitionException in the response +Added KafkaStorageException which is a retriable exception. KafkaStorageException will be converted to NotLeaderOrFollowerException in the response Review Comment: Thanks, closing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma closed pull request #12241: MINOR: Fix docs in upgrade.html
ijuma closed pull request #12241: MINOR: Fix docs in upgrade.html URL: https://github.com/apache/kafka/pull/12241 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y opened a new pull request, #12299: MINOR: Guard against decrementing `totalCommittedSinceLastSummary` du…
jnh5y opened a new pull request, #12299: URL: https://github.com/apache/kafka/pull/12299 …ring rebalancing. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on pull request #12298: KAFKA-13998 JoinGroupRequestData 'reason' can be too large
jnh5y commented on PR #12298: URL: https://github.com/apache/kafka/pull/12298#issuecomment-1156967918 I tried to find a way to create a unit test for this change, but I wasn't able to do so quickly. If someone has a suggestion for how to do that, I'm happy to follow through with it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y opened a new pull request, #12298: KAFKA-13998 JoinGroupRequestData 'reason' can be too large
jnh5y opened a new pull request, #12298: URL: https://github.com/apache/kafka/pull/12298 This fix follows the pattern which is established in `AbstractCoordinator.java` of setting the request reason with the method `requestRejoin`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bozhao12 commented on a diff in pull request #12241: MINOR: Fix docs in upgrade.html
bozhao12 commented on code in PR #12241: URL: https://github.com/apache/kafka/pull/12241#discussion_r898448690 ## docs/upgrade.html: ## @@ -1265,7 +1265,7 @@ Notable changes in 1 on live log directories even if there are offline log directories. A log directory may become offline due to IOException caused by hardware failure. Users need to monitor the per-broker metric offlineLogDirectoryCount to check whether there is offline log directory. -Added KafkaStorageException which is a retriable exception. KafkaStorageException will be converted to NotLeaderForPartitionException in the response +Added KafkaStorageException which is a retriable exception. KafkaStorageException will be converted to NotLeaderOrFollowerException in the response Review Comment: Indeed, this is reasonable, thanks for your review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13998) JoinGroupRequestData 'reason' can be too large
Jim Hughes created KAFKA-13998: -- Summary: JoinGroupRequestData 'reason' can be too large Key: KAFKA-13998 URL: https://issues.apache.org/jira/browse/KAFKA-13998 Project: Kafka Issue Type: Bug Affects Versions: 3.2.0 Reporter: Jim Hughes Assignee: Jim Hughes We saw an exception like this: ```org.apache.kafka.streams.errors.StreamsException: java.lang.RuntimeException: 'reason' field is too long to be serialized 3 at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627) 4 at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551) 5Caused by: java.lang.RuntimeException: 'reason' field is too long to be serialized 6 at org.apache.kafka.common.message.JoinGroupRequestData.addSize(JoinGroupRequestData.java:465) 7 at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) 8 at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187) 9 at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101) 10 at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524) 11 at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500) 12 at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460) 13 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:499) 14 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255) 15 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) 16 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) 17 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:437) 18 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:371) 19 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:542) 20 at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271) 21 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235) 22 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1215) 23 at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:969) 24 at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:917) 25 at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:736) 26 at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589) 27 ... 1 more``` This appears to be caused by the code passing an entire stack trace in the `rejoinReason`. See https://github.com/apache/kafka/blob/3.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L481 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] guozhangwang commented on pull request #12121: KAFKA-13846: Adding overloaded addMetricIfAbsent method
guozhangwang commented on PR #12121: URL: https://github.com/apache/kafka/pull/12121#issuecomment-1156954295 > @guozhangwang , which file does this correspond to? that update the web docs on 3.3 release new API changes Here: https://github.com/apache/kafka/blob/trunk/docs/upgrade.html. You can find earlier PRs how they update the upgrade guide / API changes in upcoming releases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #12204: [9/N WIP][Emit final] Emit final for session window aggregations
mjsax commented on code in PR #12204: URL: https://github.com/apache/kafka/pull/12204#discussion_r898411852 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java: ## @@ -61,6 +65,18 @@ public QueryResult query(final Query query, ); } +@Override +public KeyValueIterator, byte[]> findSessions(final Instant earliestSessionEndTime, + final Instant latestSessionEndTime) { +final long earliestEndTime = ApiUtils.validateMillisecondInstant(earliestSessionEndTime, +prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")); +final long latestEndTime = ApiUtils.validateMillisecondInstant(latestSessionEndTime, +prepareMillisCheckFailMsgPrefix(latestSessionEndTime, "latestSessionEndTime")); + +final KeyValueIterator bytesIterator = wrapped().fetchAll(earliestEndTime, latestEndTime); Review Comment: If I read the code correctly, what `fetchAll()` does is correct: from my understanding, that fetchAll() is implement to find "overlapping sessions" given a lower and upper bound -- the lower bound must be smaller and session end and the upper bound must be smaller than session start to find an overlap. Because the upper bound compares to session start,, and we use the "base" we need to search the full "data/base part" of the store. I guess the issue is, that you actually cannot use `fetchAll()` at all for our purpose here? Passing in `lastEndTime` does not work (does it) as it would be used to compare to session start-times, but we want to do a comparison to session end time. -- Thus, I think the right solution is, to actually also add the new `findSessions()` to the internal `SegmentedStore` and implement a proper iterator there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #12204: [9/N WIP][Emit final] Emit final for session window aggregations
mjsax commented on code in PR #12204: URL: https://github.com/apache/kafka/pull/12204#discussion_r898406030 ## streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java: ## @@ -39,6 +39,13 @@ */ public interface SessionStore extends StateStore, ReadOnlySessionStore { +// TODO: javadoc; both ends are inclusive +default KeyValueIterator, AGG> findSessions(final Instant earliestSessionEndTime, Review Comment: I think there is no way around it? In the end, we allow users to plugin a custom session-store -- thus, if the use the new emit-final, why will need to implement this new method -- existing code with custom session-stores should not break, because existing code does neither implement but also not sure this new method. If we don't make it public API, we would prevent users to pass in custom session-stores in combination with the new emit-final feature, what seems to be too restrictive? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12204: [9/N WIP][Emit final] Emit final for session window aggregations
guozhangwang commented on code in PR #12204: URL: https://github.com/apache/kafka/pull/12204#discussion_r880784817 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java: ## @@ -61,6 +65,18 @@ public QueryResult query(final Query query, ); } +@Override +public KeyValueIterator, byte[]> findSessions(final Instant earliestSessionEndTime, + final Instant latestSessionEndTime) { +final long earliestEndTime = ApiUtils.validateMillisecondInstant(earliestSessionEndTime, +prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")); +final long latestEndTime = ApiUtils.validateMillisecondInstant(latestSessionEndTime, +prepareMillisCheckFailMsgPrefix(latestSessionEndTime, "latestSessionEndTime")); + +final KeyValueIterator bytesIterator = wrapped().fetchAll(earliestEndTime, latestEndTime); Review Comment: This is the second open question: with the current prefixed (base, i.e. time-first) session key schema, this fetchAll would be effectively searching for `[earliestEnd, INF]` because of this logic: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java#L46 This is because we translate the range query without key inside `AbstractRocksDBTimeOrderedSegmentedBytesStore` by using the `lower/upperRange` instead of `lower/upperRangeFixedSize`): https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java#L241-L242 I cannot remember why we need to do this. @lihaosky @mjsax do you remember why? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #12204: [9/N WIP][Emit final] Emit final for session window aggregations
mjsax commented on code in PR #12204: URL: https://github.com/apache/kafka/pull/12204#discussion_r898340071 ## streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java: ## @@ -202,25 +205,43 @@ public void remove(final Windowed sessionKey) { @Override public byte[] fetchSession(final Bytes key, - final long earliestSessionEndTime, - final long latestSessionStartTime) { + final long sessionStartTime, + final long sessionEndTime) { removeExpiredSegments(); Objects.requireNonNull(key, "key cannot be null"); // Only need to search if the record hasn't expired yet -if (latestSessionStartTime > observedStreamTime - retentionPeriod) { -final ConcurrentNavigableMap> keyMap = endTimeMap.get(latestSessionStartTime); +if (sessionEndTime > observedStreamTime - retentionPeriod) { +final ConcurrentNavigableMap> keyMap = endTimeMap.get(sessionEndTime); if (keyMap != null) { final ConcurrentNavigableMap startTimeMap = keyMap.get(key); if (startTimeMap != null) { -return startTimeMap.get(earliestSessionEndTime); +return startTimeMap.get(sessionStartTime); } } } return null; } +@Override +public KeyValueIterator, byte[]> findSessions(final Instant earliestSessionEndTime, + final Instant latestSessionEndTime) { +removeExpiredSegments(); + +final long earliestEndTime = ApiUtils.validateMillisecondInstant(earliestSessionEndTime, +prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")); +final long latestEndTime = ApiUtils.validateMillisecondInstant(latestSessionEndTime, +prepareMillisCheckFailMsgPrefix(latestSessionEndTime, "latestSessionEndTime")); + +// since subMap is exclusive on toKey, we need to plus one +return registerNewIterator(null, + null, +Long.MAX_VALUE, +endTimeMap.subMap(earliestEndTime, latestEndTime + 1).entrySet().iterator(), +true); Review Comment: Ok. I read the code of `InMemorySessionStore` in detail and now understand what's going on. This LGTM. ## streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java: ## @@ -202,25 +205,43 @@ public void remove(final Windowed sessionKey) { @Override public byte[] fetchSession(final Bytes key, - final long earliestSessionEndTime, - final long latestSessionStartTime) { + final long sessionStartTime, + final long sessionEndTime) { removeExpiredSegments(); Objects.requireNonNull(key, "key cannot be null"); // Only need to search if the record hasn't expired yet -if (latestSessionStartTime > observedStreamTime - retentionPeriod) { -final ConcurrentNavigableMap> keyMap = endTimeMap.get(latestSessionStartTime); +if (sessionEndTime > observedStreamTime - retentionPeriod) { +final ConcurrentNavigableMap> keyMap = endTimeMap.get(sessionEndTime); if (keyMap != null) { final ConcurrentNavigableMap startTimeMap = keyMap.get(key); if (startTimeMap != null) { -return startTimeMap.get(earliestSessionEndTime); +return startTimeMap.get(sessionStartTime); } } } return null; } +@Override +public KeyValueIterator, byte[]> findSessions(final Instant earliestSessionEndTime, + final Instant latestSessionEndTime) { +removeExpiredSegments(); + +final long earliestEndTime = ApiUtils.validateMillisecondInstant(earliestSessionEndTime, +prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")); +final long latestEndTime = ApiUtils.validateMillisecondInstant(latestSessionEndTime, +prepareMillisCheckFailMsgPrefix(latestSessionEndTime, "latestSessionEndTime")); + +// since subMap is exclusive on toKey, we need to plus one +return registerNewIterator(null, + null, +Long.MAX_VALUE, Review Comment: nit: indention -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific
[GitHub] [kafka] mjsax commented on a diff in pull request #12204: [9/N WIP][Emit final] Emit final for session window aggregations
mjsax commented on code in PR #12204: URL: https://github.com/apache/kafka/pull/12204#discussion_r898339592 ## streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java: ## @@ -202,25 +205,43 @@ public void remove(final Windowed sessionKey) { @Override public byte[] fetchSession(final Bytes key, - final long earliestSessionEndTime, - final long latestSessionStartTime) { + final long sessionStartTime, + final long sessionEndTime) { removeExpiredSegments(); Objects.requireNonNull(key, "key cannot be null"); // Only need to search if the record hasn't expired yet -if (latestSessionStartTime > observedStreamTime - retentionPeriod) { -final ConcurrentNavigableMap> keyMap = endTimeMap.get(latestSessionStartTime); +if (sessionEndTime > observedStreamTime - retentionPeriod) { +final ConcurrentNavigableMap> keyMap = endTimeMap.get(sessionEndTime); if (keyMap != null) { final ConcurrentNavigableMap startTimeMap = keyMap.get(key); if (startTimeMap != null) { -return startTimeMap.get(earliestSessionEndTime); +return startTimeMap.get(sessionStartTime); } } } return null; } +@Override +public KeyValueIterator, byte[]> findSessions(final Instant earliestSessionEndTime, + final Instant latestSessionEndTime) { +removeExpiredSegments(); + +final long earliestEndTime = ApiUtils.validateMillisecondInstant(earliestSessionEndTime, +prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")); +final long latestEndTime = ApiUtils.validateMillisecondInstant(latestSessionEndTime, +prepareMillisCheckFailMsgPrefix(latestSessionEndTime, "latestSessionEndTime")); + +// since subMap is exclusive on toKey, we need to plus one +return registerNewIterator(null, + null, +Long.MAX_VALUE, +endTimeMap.subMap(earliestEndTime, latestEndTime + 1).entrySet().iterator(), Review Comment: Nit: can we call `subMap(earliestEndTime, true, latestEndTime, true)` which is the same thing but more "intuitive" as we always search for _inclusive_ bounds throughout the code (otherwise, this is the only place which has an exclusive upper bound). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
[ https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554742#comment-17554742 ] Matthias J. Sax commented on KAFKA-13939: - Thanks for the PR. I added you to list of contributors and assigned the ticket to you. You can know also self-assign tickets. > Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer > - > > Key: KAFKA-13939 > URL: https://issues.apache.org/jira/browse/KAFKA-13939 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jackson Newhouse >Assignee: Jackson Newhouse >Priority: Blocker > > If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within > `flush()`, see > [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.] > However, dirtyKeys is still written to in the loop within `evictWhile`. This > causes dirtyKeys to continuously grow for the life of the buffer. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
[ https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-13939: --- Assignee: Jackson Newhouse > Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer > - > > Key: KAFKA-13939 > URL: https://issues.apache.org/jira/browse/KAFKA-13939 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jackson Newhouse >Assignee: Jackson Newhouse >Priority: Blocker > > If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within > `flush()`, see > [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.] > However, dirtyKeys is still written to in the loop within `evictWhile`. This > causes dirtyKeys to continuously grow for the life of the buffer. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13888) KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag
[ https://issues.apache.org/jira/browse/KAFKA-13888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554738#comment-17554738 ] Niket Goel commented on KAFKA-13888: The PR that adds the API to handler to the admin client ([https://github.com/apache/kafka/pull/12206)] has now been merged. The API does not return any value for the newly added fields at this time. [~Jack-Lee] has a draft PR to add the implementation for the fields. Following up with him to iterate on his PR. > KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag > -- > > Key: KAFKA-13888 > URL: https://issues.apache.org/jira/browse/KAFKA-13888 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: Niket Goel >Assignee: lqjacklee >Priority: Major > Fix For: 3.3.0 > > > Tracking issue for the implementation of KIP:836 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Comment Edited] (KAFKA-13888) KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag
[ https://issues.apache.org/jira/browse/KAFKA-13888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554738#comment-17554738 ] Niket Goel edited comment on KAFKA-13888 at 6/15/22 5:59 PM: - The PR that adds the API to handler to the admin client ([https://github.com/apache/kafka/pull/12206)] has now been merged. The API does not return any value for the newly added fields at this time. [~Jack-Lee] has a draft PR (https://github.com/apache/kafka/pull/12212) to add the implementation for the fields. Following up with him to iterate on his PR. was (Author: niket goel): The PR that adds the API to handler to the admin client ([https://github.com/apache/kafka/pull/12206)] has now been merged. The API does not return any value for the newly added fields at this time. [~Jack-Lee] has a draft PR to add the implementation for the fields. Following up with him to iterate on his PR. > KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag > -- > > Key: KAFKA-13888 > URL: https://issues.apache.org/jira/browse/KAFKA-13888 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: Niket Goel >Assignee: lqjacklee >Priority: Major > Fix For: 3.3.0 > > > Tracking issue for the implementation of KIP:836 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] mjsax commented on a diff in pull request #12293: KAFKA-13963: Clarified java doc for processors api
mjsax commented on code in PR #12293: URL: https://github.com/apache/kafka/pull/12293#discussion_r898266334 ## streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java: ## @@ -30,6 +30,7 @@ * In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one * sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology * {@link Topology#addSource(String, String...) reads} from the same topic. + * Processors and Transformers created with the Processor API are treated as black boxes and are not represented in the topology graph. Review Comment: Can you update the PR to re-phrase it so it's easier to understand? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niket-goel commented on pull request #12212: Kafka 13888 new fields
niket-goel commented on PR #12212: URL: https://github.com/apache/kafka/pull/12212#issuecomment-1156768917 Hey @lqjack . Thanks for raising this PR. The PR https://github.com/apache/kafka/pull/12206 has now been merged to trunk. Can you please update your PR with the latest code so we can iterate on it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #12121: KAFKA-13846: Adding overloaded addMetricIfAbsent method
vamossagar12 commented on PR #12121: URL: https://github.com/apache/kafka/pull/12121#issuecomment-1156764298 @ijuma ., i updated the PR name. Also, created a follow up PR to address some of the comments: https://github.com/apache/kafka/pull/12297 @guozhangwang , which file does this correspond to? ` that update the web docs on 3.3 release new API changes` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
philipnee commented on PR #12149: URL: https://github.com/apache/kafka/pull/12149#issuecomment-1156747595 @ijuma - I think @hachikuji is reviewing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12285: KAFKA-7342 Part 1: Straightforward JUnit4 to JUnit5 migrations
clolov commented on PR #12285: URL: https://github.com/apache/kafka/pull/12285#issuecomment-1156744613 Hello! Thank you to everyone who has left a comment and suggestions for improvement. In the next few days I will aim to rework this pull request. In summary: * I will revert the import reordering * I will not prefix assertions with Assertions * I will mention that these changes are for the streams module * I will split the PR into multiple ones so to stick to the <= 500 lines rule -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12233: MINOR: Clean up tmp files created by tests
divijvaidya commented on code in PR #12233: URL: https://github.com/apache/kafka/pull/12233#discussion_r898241099 ## metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java: ## @@ -31,9 +35,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class BootstrapMetadataTest { +private Path tmpDir; Review Comment: I think we should continue to use Path because: 1. `BootstrapMetadata.write()` used in every test in this file accepts a `Path`. 2. Prior to this change, we were using Path in this test file, so the current PR doesn't change anything on that front. 3. The new Java NIO.2 introduced many new(aka better) helper methods using `Files.*` which use `Path` instead of files. I am already starting to replace the older `java.io` methods with new Nio.2 methods in the code base [1]. Due to the benefits of the new helper methods, I think we should continue to stick use `Path` instead of `File`. However, please let me know if you feel strongly about this comment, I will make the change accordingly. [1] https://issues.apache.org/jira/browse/KAFKA-13928 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16
mdedetrich commented on code in PR #12284: URL: https://github.com/apache/kafka/pull/12284#discussion_r898217940 ## docs/upgrade.html: ## @@ -59,6 +59,9 @@ Upgrading to 3.2.0 from any vers (or to take advantage of exactly once semantics), the newer Java clients must be used. +Upgrade from Scala 2.12.15 to 2.12.16. See https://github.com/scala/scala/releases/tag/v2.12.16 for release Review Comment: I removed the release notes and forced pushed the branch. The PR description contains the release notes instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
ijuma commented on PR #12149: URL: https://github.com/apache/kafka/pull/12149#issuecomment-1156718718 @philipnee where are we with this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12148: MINOR: Remove unnecessary log4j-appender dependency and tweak explicit log4j dependency
ijuma commented on PR #12148: URL: https://github.com/apache/kafka/pull/12148#issuecomment-1156718290 @omkreddy maybe you can help review this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16
mdedetrich commented on code in PR #12284: URL: https://github.com/apache/kafka/pull/12284#discussion_r898217940 ## docs/upgrade.html: ## @@ -59,6 +59,9 @@ Upgrading to 3.2.0 from any vers (or to take advantage of exactly once semantics), the newer Java clients must be used. +Upgrade from Scala 2.12.15 to 2.12.16. See https://github.com/scala/scala/releases/tag/v2.12.16 for release Review Comment: I removed the these release and forced pushed the branch. The PR description contains the release notes instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma closed pull request #12232: MINOR:rm deprecated method
ijuma closed pull request #12232: MINOR:rm deprecated method URL: https://github.com/apache/kafka/pull/12232 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12232: MINOR:rm deprecated method
ijuma commented on PR #12232: URL: https://github.com/apache/kafka/pull/12232#issuecomment-1156711344 Yes, we only remove deprecated methods during major releases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12233: MINOR: Clean up tmp files created by tests
ijuma commented on PR #12233: URL: https://github.com/apache/kafka/pull/12233#issuecomment-1156710957 Thanks for the PR. It looks reasonable, just one nit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12233: MINOR: Clean up tmp files created by tests
ijuma commented on code in PR #12233: URL: https://github.com/apache/kafka/pull/12233#discussion_r898209292 ## metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java: ## @@ -31,9 +35,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class BootstrapMetadataTest { +private Path tmpDir; Review Comment: Not sure we gain much by using `Path` since all our utility methods work with `File`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12241: MINOR: Fix docs in upgrade.html
ijuma commented on code in PR #12241: URL: https://github.com/apache/kafka/pull/12241#discussion_r898207431 ## docs/upgrade.html: ## @@ -1265,7 +1265,7 @@ Notable changes in 1 on live log directories even if there are offline log directories. A log directory may become offline due to IOException caused by hardware failure. Users need to monitor the per-broker metric offlineLogDirectoryCount to check whether there is offline log directory. -Added KafkaStorageException which is a retriable exception. KafkaStorageException will be converted to NotLeaderForPartitionException in the response +Added KafkaStorageException which is a retriable exception. KafkaStorageException will be converted to NotLeaderOrFollowerException in the response Review Comment: This is for 1.0.0, at the time it was called `NotLeaderForPartitionException`. I don't think we want to update old release notes in this way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12278: MINOR: add AuthorizerNotReadyException
ijuma commented on PR #12278: URL: https://github.com/apache/kafka/pull/12278#issuecomment-1156706455 Do we have a KIP for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12290: MINOR: Stop leaking threads in BlockingConnectorTest
ijuma commented on PR #12290: URL: https://github.com/apache/kafka/pull/12290#issuecomment-1156705915 @kkonstantine can you please review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16
ijuma commented on code in PR #12284: URL: https://github.com/apache/kafka/pull/12284#discussion_r898205328 ## docs/upgrade.html: ## @@ -59,6 +59,9 @@ Upgrading to 3.2.0 from any vers (or to take advantage of exactly once semantics), the newer Java clients must be used. +Upgrade from Scala 2.12.15 to 2.12.16. See https://github.com/scala/scala/releases/tag/v2.12.16 for release Review Comment: We don't usually add patch upgrades to the release notes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12285: KAFKA-7342 Part 1: Straightforward JUnit4 to JUnit5 migrations
ijuma commented on PR #12285: URL: https://github.com/apache/kafka/pull/12285#issuecomment-1156705224 > Worth putting in a separate PR, but have you tried enabling the Jupiter parallel test runner? When I ran it on a work project, it improved build times by an order of magnitude We use multiple forks at the gradle level, so we should not enable this. A couple more comments: 1. Let's revert the formatting changes. 2. Do not prefix with `Assertions`. 3. Mention in the PR title that these changes are for the streams module(s). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-13888) KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag
[ https://issues.apache.org/jira/browse/KAFKA-13888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reopened KAFKA-13888: - > KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag > -- > > Key: KAFKA-13888 > URL: https://issues.apache.org/jira/browse/KAFKA-13888 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: Niket Goel >Assignee: lqjacklee >Priority: Major > Fix For: 3.3.0 > > > Tracking issue for the implementation of KIP:836 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] hachikuji commented on pull request #12212: Kafka 13888 new fields
hachikuji commented on PR #12212: URL: https://github.com/apache/kafka/pull/12212#issuecomment-1156686396 @lqjack Can you merge with trunk please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13888) KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag
[ https://issues.apache.org/jira/browse/KAFKA-13888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13888. - Fix Version/s: 3.3.0 Resolution: Fixed > KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag > -- > > Key: KAFKA-13888 > URL: https://issues.apache.org/jira/browse/KAFKA-13888 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: Niket Goel >Assignee: lqjacklee >Priority: Major > Fix For: 3.3.0 > > > Tracking issue for the implementation of KIP:836 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] hachikuji merged pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
hachikuji merged PR #12206: URL: https://github.com/apache/kafka/pull/12206 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac closed pull request #12292: MINOR: KRaft nodes not shutdown correctly when using one controller in colocated mode
dajac closed pull request #12292: MINOR: KRaft nodes not shutdown correctly when using one controller in colocated mode URL: https://github.com/apache/kafka/pull/12292 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #12292: MINOR: KRaft nodes not shutdown correctly when using one controller in colocated mode
dajac commented on PR #12292: URL: https://github.com/apache/kafka/pull/12292#issuecomment-1156567953 Already fixed by https://github.com/apache/kafka/pull/11238. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #11238: MINOR: Fix force kill of KRaft colocated controllers in system tests
dajac merged PR #11238: URL: https://github.com/apache/kafka/pull/11238 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tyamashi-oss opened a new pull request, #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically
tyamashi-oss opened a new pull request, #12296: URL: https://github.com/apache/kafka/pull/12296 - Implementation: - Add updateDesiredRatePerSec() on Throttler - Call updateDesiredRatePerSec() of Throttler with new log.cleaner.io.max.bytes.per.second value in reconfigure() of Log Cleaner - I implemented the feature to be similar to [reconfigure() of SocketServer](https://github.com/apache/kafka/blob/fa59be4e770627cd34cef85986b58ad7f606928d/core/src/main/scala/kafka/network/SocketServer.scala#L336-L357) - Alternative implementation considered: - re-instantiate Throttler with new log.cleaner.io.max.bytes.per.second value in reconfigure() of Log Cleaner - However, since many parameter specifications are required to instantiate Throttler, I chose to be similar to SocketServer and update only log.cleaner.io.max.bytes.per.second - Test: - Added unit test in case of updating DesiredRatePerSec of Throttler - I confirmed by manual testing that log.cleaner.io.max.bytes.per.second can be changed using bin/kafka-configs.sh: > [2022-06-15 22:44:03,089] INFO [kafka-log-cleaner-thread-0]: Log cleaner thread 0 cleaned log my-topic-0 (dirty section = [57585, 86901]) 2,799.3 MB of log processed in 596.0 seconds (4.7 MB/sec). Indexed 2,799.2 MB in 298.1 seconds (9.4 Mb/sec, 50.0% of total time) Buffer utilization: 0.0% Cleaned 2,799.3 MB in 298.0 seconds (9.4 Mb/sec, 50.0% of total time) Start size: 2,799.3 MB (29,317 messages) End size: 0.1 MB (1 messages) 100.0% size reduction (100.0% fewer messages) (kafka.log.LogCleaner)` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13996) log.cleaner.io.max.bytes.per.second cannot be changed dynamically
Tomonari Yamashita created KAFKA-13996: -- Summary: log.cleaner.io.max.bytes.per.second cannot be changed dynamically Key: KAFKA-13996 URL: https://issues.apache.org/jira/browse/KAFKA-13996 Project: Kafka Issue Type: Bug Components: config, core, log cleaner Affects Versions: 3.2.0 Reporter: Tomonari Yamashita Assignee: Tomonari Yamashita - log.cleaner.io.max.bytes.per.second cannot be changed dynamically using bin/kafka-configs.sh - Reproduction procedure: -# Create a topic with cleanup.policy=compact {code:java} bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic my-topic --config cleanup.policy=compact --config cleanup.policy=compact --config segment.bytes=104857600 --config compression.type=producer {code} -# Change log.cleaner.io.max.bytes.per.second=10485760 using bin/kafka-configs.sh {code:java} bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.io.max.bytes.per.second=10485760 {code} -# Send enough messages(> segment.bytes=104857600) to activate Log Cleaner -# logs/log-cleaner.log, configuration by log.cleaner.io.max.bytes.per.second=10485760 is not reflected and Log Cleaner does not slow down (>= log.cleaner.io.max.bytes.per.second=10485760). {code:java} [2022-06-15 14:52:14,988] INFO [kafka-log-cleaner-thread-0]: Log cleaner thread 0 cleaned log my-topic-0 (dirty section = [39786, 81666]) 3,999.0 MB of log processed in 2.7 seconds (1,494.4 MB/sec). Indexed 3,998.9 MB in 0.9 seconds (4,218.2 Mb/sec, 35.4% of total time) Buffer utilization: 0.0% Cleaned 3,999.0 MB in 1.7 seconds (2,314.2 Mb/sec, 64.6% of total time) Start size: 3,999.0 MB (41,881 messages) End size: 0.1 MB (1 messages) 100.0% size reduction (100.0% fewer messages) (kafka.log.LogCleaner) {code} - Problem cause: -- log.cleaner.io.max.bytes.per.second is used in Throttler in LogCleaner, however, it is only passed to Throttler at initialization time. --- https://github.com/apache/kafka/blob/4380eae7ceb840dd93fee8ec90cd89a72bad7a3f/core/src/main/scala/kafka/log/LogCleaner.scala#L107-L112 -- Need to change Throttler configuration value at reconfigure() of LogCleaner. --- https://github.com/apache/kafka/blob/4380eae7ceb840dd93fee8ec90cd89a72bad7a3f/core/src/main/scala/kafka/log/LogCleaner.scala#L192-L196 - A workaround is that restarting every broker adding log.cleaner.io.max.bytes.per.second to config/server.properties -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13997) one partition logs are not getting purged
[ https://issues.apache.org/jira/browse/KAFKA-13997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Naveen P updated KAFKA-13997: - Description: We have issue with one of our topic in kafka cluster, which is taking huge space, while checking found one of the partion doesn't have old logs getting purged, due to this space is getting full. Since we have replication factor 3 for this topic, same behaviour is observed on 3 kafka broker nodes. We need workaround to cleanup old log messages and also find root cause for this issue. was: We have issue with one of our topic in kafka cluster, which is taking huge space, while checking found one of the partion doesn't have old logs getting purged, due to this space is getting full. Since we have replication factor 3 for this topic, same behaviour is observed on 2 kafka broker nodes. We need workaround to cleanup old log messages and also find root cause for this issue. > one partition logs are not getting purged > -- > > Key: KAFKA-13997 > URL: https://issues.apache.org/jira/browse/KAFKA-13997 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.0.0 >Reporter: Naveen P >Priority: Major > > We have issue with one of our topic in kafka cluster, which is taking huge > space, while checking found one of the partion doesn't have old logs getting > purged, due to this space is getting full. > > Since we have replication factor 3 for this topic, same behaviour is observed > on 3 kafka broker nodes. We need workaround to cleanup old log messages and > also find root cause for this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13997) one partition logs are not getting purged
Naveen P created KAFKA-13997: Summary: one partition logs are not getting purged Key: KAFKA-13997 URL: https://issues.apache.org/jira/browse/KAFKA-13997 Project: Kafka Issue Type: Bug Components: log cleaner Affects Versions: 2.0.0 Reporter: Naveen P We have issue with one of our topic in kafka cluster, which is taking huge space, while checking found one of the partion doesn't have old logs getting purged, due to this space is getting full. Since we have replication factor 3 for this topic, same behaviour is observed on 2 kafka broker nodes. We need workaround to cleanup old log messages and also find root cause for this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13995) Does Kafka support Network File System (NFS)? Is it recommended in Production?
[ https://issues.apache.org/jira/browse/KAFKA-13995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554577#comment-17554577 ] Devarshi Shah commented on KAFKA-13995: --- A kind request to answer as soon as possible as it's blocking our deliveries in customer's production environments. > Does Kafka support Network File System (NFS)? Is it recommended in Production? > -- > > Key: KAFKA-13995 > URL: https://issues.apache.org/jira/browse/KAFKA-13995 > Project: Kafka > Issue Type: Test >Affects Versions: 3.0.0 > Environment: Kubernetes Cluster >Reporter: Devarshi Shah >Priority: Blocker > > I've gone through the Apache Kafka Documentation. It does not contain > information about the support of underlying storage type, whether Kafka > supports block storage, Network File System (NFS) and/or others. On the > internet, I could find that it supports NFS, however most of them summarize > not to use NFS in Production. May we get proper information whether Kafka > recommends NFS in Production, or it doesn't support NFS to begin with? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13995) Does Kafka support Network File System (NFS)? Is it recommended in Production?
[ https://issues.apache.org/jira/browse/KAFKA-13995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Devarshi Shah updated KAFKA-13995: -- Description: I've gone through the Apache Kafka Documentation. It does not contain information about the support of underlying storage type, whether Kafka supports block storage, Network File System (NFS) and/or others. On the internet, I could find that it supports NFS, however most of them summarize not to use NFS in Production. May we get proper information whether Kafka recommends NFS in Production, or it doesn't support NFS to begin with? (was: I've gone through the Apache Kafka Documentation. It does not contain information about the support of underlying storage type, whether Kafka supports block storage, Network File System (NFS). On the internet, I could find that it supports NFS, however most of them summarize not to use NFS in Production. May we get proper information whether Kafka recommends NFS, or it doesn't support NFS to begin with?) > Does Kafka support Network File System (NFS)? Is it recommended in Production? > -- > > Key: KAFKA-13995 > URL: https://issues.apache.org/jira/browse/KAFKA-13995 > Project: Kafka > Issue Type: Test >Affects Versions: 3.0.0 > Environment: Kubernetes Cluster >Reporter: Devarshi Shah >Priority: Blocker > > I've gone through the Apache Kafka Documentation. It does not contain > information about the support of underlying storage type, whether Kafka > supports block storage, Network File System (NFS) and/or others. On the > internet, I could find that it supports NFS, however most of them summarize > not to use NFS in Production. May we get proper information whether Kafka > recommends NFS in Production, or it doesn't support NFS to begin with? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13995) Does Kafka support Network File System (NFS)? Is it recommended in Production?
Devarshi Shah created KAFKA-13995: - Summary: Does Kafka support Network File System (NFS)? Is it recommended in Production? Key: KAFKA-13995 URL: https://issues.apache.org/jira/browse/KAFKA-13995 Project: Kafka Issue Type: Test Affects Versions: 3.0.0 Environment: Kubernetes Cluster Reporter: Devarshi Shah I've gone through the Apache Kafka Documentation. It does not contain information about the support of underlying storage type, whether Kafka supports block storage, Network File System (NFS). On the internet, I could find that it supports NFS, however most of them summarize not to use NFS in Production. May we get proper information whether Kafka recommends NFS, or it doesn't support NFS to begin with? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] divijvaidya commented on pull request #12224: KAFKA-13943: Make LocalLogManager implementation consistent with the RaftClient interface contract
divijvaidya commented on PR #12224: URL: https://github.com/apache/kafka/pull/12224#issuecomment-1156428420 @jsancio please review when you get a chance. Currently multiple tests in `QuorumControllerTest` are acting flaky because we are allowing to create a snapshot with LONG_MAX value. This is making it difficult to review PRs due to flakiness. This code change fixes that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13943) Fix flaky test QuorumControllerTest.testMissingInMemorySnapshot()
[ https://issues.apache.org/jira/browse/KAFKA-13943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya reassigned KAFKA-13943: Assignee: Divij Vaidya > Fix flaky test QuorumControllerTest.testMissingInMemorySnapshot() > - > > Key: KAFKA-13943 > URL: https://issues.apache.org/jira/browse/KAFKA-13943 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Divij Vaidya >Assignee: Divij Vaidya >Priority: Major > Labels: flaky-test > > Test failed at > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12197/3/tests] > > {noformat} > [2022-05-27 09:34:42,382] INFO [Controller 0] Creating new QuorumController > with clusterId wj9LhgPJTV-KYEItgqvtQA, authorizer Optional.empty. > (org.apache.kafka.controller.QuorumController:1484) > [2022-05-27 09:34:42,393] DEBUG [LocalLogManager 0] Node 0: running log > check. (org.apache.kafka.metalog.LocalLogManager:479) > [2022-05-27 09:34:42,394] DEBUG [LocalLogManager 0] initialized local log > manager for node 0 (org.apache.kafka.metalog.LocalLogManager:622) > [2022-05-27 09:34:42,396] INFO [LocalLogManager 0] Node 0: registered > MetaLogListener 1774961169 (org.apache.kafka.metalog.LocalLogManager:640) > [2022-05-27 09:34:42,397] DEBUG [LocalLogManager 0] Node 0: running log > check. (org.apache.kafka.metalog.LocalLogManager:479) > [2022-05-27 09:34:42,397] DEBUG [LocalLogManager 0] Node 0: Executing > handleLeaderChange LeaderAndEpoch(leaderId=OptionalInt[0], epoch=1) > (org.apache.kafka.metalog.LocalLogManager:520) > [2022-05-27 09:34:42,398] DEBUG [Controller 0] Executing > handleLeaderChange[1]. (org.apache.kafka.controller.QuorumController:438) > [2022-05-27 09:34:42,398] INFO [Controller 0] Becoming the active controller > at epoch 1, committed offset -1, committed epoch -1, and metadata.version 5 > (org.apache.kafka.controller.QuorumController:950) > [2022-05-27 09:34:42,398] DEBUG [Controller 0] Creating snapshot -1 > (org.apache.kafka.timeline.SnapshotRegistry:197) > [2022-05-27 09:34:42,399] DEBUG [Controller 0] Processed > handleLeaderChange[1] in 951 us > (org.apache.kafka.controller.QuorumController:385) > [2022-05-27 09:34:42,399] INFO [Controller 0] Initializing metadata.version > to 5 (org.apache.kafka.controller.QuorumController:926) > [2022-05-27 09:34:42,399] INFO [Controller 0] Setting metadata.version to 5 > (org.apache.kafka.controller.FeatureControlManager:273) > [2022-05-27 09:34:42,400] DEBUG [Controller 0] Creating snapshot > 9223372036854775807 (org.apache.kafka.timeline.SnapshotRegistry:197) > [2022-05-27 09:34:42,400] DEBUG [Controller 0] Read-write operation > bootstrapMetadata(1863535402) will be completed when the log reaches offset > 9223372036854775807. (org.apache.kafka.controller.QuorumController:725) > [2022-05-27 09:34:42,402] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, > appendTimestamp=10, > records=[ApiMessageAndVersion(RegisterBrokerRecord(brokerId=0, > incarnationId=kxAT73dKQsitIedpiPtwBw, brokerEpoch=-9223372036854775808, > endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, > securityProtocol=0)], features=[], rack=null, fenced=true) at version 0)]), > prevOffset=1) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:247) > [2022-05-27 09:34:42,402] INFO [Controller 0] Registered new broker: > RegisterBrokerRecord(brokerId=0, incarnationId=kxAT73dKQsitIedpiPtwBw, > brokerEpoch=-9223372036854775808, endPoints=[BrokerEndpoint(name='PLAINTEXT', > host='localhost', port=9092, securityProtocol=0)], features=[], rack=null, > fenced=true) (org.apache.kafka.controller.ClusterControlManager:368) > [2022-05-27 09:34:42,403] WARN [Controller 0] registerBroker: failed with > unknown server exception RuntimeException at epoch 1 in 2449 us. Reverting > to last committed offset -1. > (org.apache.kafka.controller.QuorumController:410)java.lang.RuntimeException: > Can't create a new snapshot at epoch 1 because there is already a snapshot > with epoch 9223372036854775807at > org.apache.kafka.timeline.SnapshotRegistry.getOrCreateSnapshot(SnapshotRegistry.java:190) > at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:723) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) > at java.base/java.lang.Thread.run(Thread.java:833){noformat} > {noformat} > Full stack trace > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownServerException: >
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16
divijvaidya commented on code in PR #12284: URL: https://github.com/apache/kafka/pull/12284#discussion_r897932645 ## docs/upgrade.html: ## @@ -59,6 +59,9 @@ Upgrading to 3.2.0 from any vers (or to take advantage of exactly once semantics), the newer Java clients must be used. +Upgrade from Scala 2.12.15 to 2.12.16. See https://github.com/scala/scala/releases/tag/v2.12.16 for release Review Comment: Is this comment at the right place? Currently, it seems like that it is under the section "For a rolling upgrade" which contains instructions for Kafka cluster administrators to handle version upgrades which should have nothing to do with scala version since scala is bundled as a dependency independent of the scala version on the OS. @ijuma will know more about this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16
divijvaidya commented on PR #12284: URL: https://github.com/apache/kafka/pull/12284#issuecomment-1156421242 cc: @ijuma (since you seem have performed scala upgrades in the past) The test failures do not seem related to this code change to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12286: KAFKA-13984: Fix TopicBasedRemoteLogMetadataManager#initializeResources should exit immediately when partition size of __remote_log_metad
divijvaidya commented on PR #12286: URL: https://github.com/apache/kafka/pull/12286#issuecomment-1156415549 @bozhao12 can you please add a unit test in `TopicBasedRemoteLogMetadataManagerTest` that fails before this change and succeeds after this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for Streams…
cadonna commented on PR #10881: URL: https://github.com/apache/kafka/pull/10881#issuecomment-1156400720 @wycc Do you plan to still work on this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for Streams…
cadonna commented on PR #10881: URL: https://github.com/apache/kafka/pull/10881#issuecomment-1156398967 @clolov Thank you for your interest and help! Since this PR was not touched more than half a year, I would be fine to close it and you can open a new one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12285: KAFKA-7342 Part 1: Straightforward JUnit4 to JUnit5 migrations
cadonna commented on PR #12285: URL: https://github.com/apache/kafka/pull/12285#issuecomment-1156391414 @clolov Thank you for the PR! I agree with @divijvaidya about doing the reformatting in a separate PR. Could you also try to subdivide the PR into smaller PRs? Reviewing a 6500 line PR is never fun. Sizes around 500 are acceptable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
jnh5y commented on PR #12161: URL: https://github.com/apache/kafka/pull/12161#issuecomment-1156385644 > > @jnh5y Thank you for the updates! > > LGTM! > > Had just one nit. > > Thank you for your patience! @cadonna Thank you for pushing me and helping me learn more about streams! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on a diff in pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
jnh5y commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r897893883 ## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.KeyValue.pair; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category({IntegrationTest.class}) +public class PauseResumeIntegrationTest { +private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45); +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); +private static Properties producerConfig; +private static Properties consumerConfig; + +private static final Materialized> IN_MEMORY_STORE = +Materialized.as(Stores.inMemoryKeyValueStore("store")); + +private static final String INPUT_STREAM_1 = "input-stream-1"; +private static final String INPUT_STREAM_2 = "input-stream-2"; +private static final String OUTPUT_STREAM_1 = "output-stream-1"; +private static final String OUTPUT_STREAM_2 = "output-stream-2"; +private static final String TOPOLOGY1 = "topology1"; +private static final String TOPOLOGY2 = "topology2"; + +private static final List> STANDARD_INPUT_DATA = +asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); +private static final List> COUNT_OUTPUT_DATA = +asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L)); +private
[GitHub] [kafka] cadonna commented on a diff in pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
cadonna commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r897887371 ## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.KeyValue.pair; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category({IntegrationTest.class}) +public class PauseResumeIntegrationTest { +private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45); +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); +private static Properties producerConfig; +private static Properties consumerConfig; + +private static final Materialized> IN_MEMORY_STORE = +Materialized.as(Stores.inMemoryKeyValueStore("store")); + +private static final String INPUT_STREAM_1 = "input-stream-1"; +private static final String INPUT_STREAM_2 = "input-stream-2"; +private static final String OUTPUT_STREAM_1 = "output-stream-1"; +private static final String OUTPUT_STREAM_2 = "output-stream-2"; +private static final String TOPOLOGY1 = "topology1"; +private static final String TOPOLOGY2 = "topology2"; + +private static final List> STANDARD_INPUT_DATA = +asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); +private static final List> COUNT_OUTPUT_DATA = +asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L)); +private
[GitHub] [kafka] jnh5y commented on a diff in pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
jnh5y commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r897869162 ## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ## @@ -335,7 +333,39 @@ public void pauseResumehouldWorkAcrossInstances() throws Exception { kafkaStreams.resume(); waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); -awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); +awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA); +} + +@Test +public void pausedTopologyShouldNotRestoreStateStores() throws Exception { +produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + +kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1); Review Comment: My mistake; I've updated the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on a diff in pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
jnh5y commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r897868472 ## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ## @@ -335,7 +333,39 @@ public void pauseResumehouldWorkAcrossInstances() throws Exception { kafkaStreams.resume(); waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); -awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); +awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA); +} + +@Test +public void pausedTopologyShouldNotRestoreStateStores() throws Exception { +produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + +kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams.start(); +kafkaStreams2.start(); + +waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), State.RUNNING, STARTUP_TIMEOUT); + +awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA); + +kafkaStreams.close(); +kafkaStreams2.close(); + +kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams.cleanUp(); +kafkaStreams2.cleanUp(); + +kafkaStreams.pause(); +kafkaStreams2.pause(); +kafkaStreams.start(); +kafkaStreams2.start(); + +waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), State.REBALANCING, STARTUP_TIMEOUT); + +assertTrue(kafkaStreams.allLocalStorePartitionLags().isEmpty()); +assertTrue(kafkaStreams2.allLocalStorePartitionLags().isEmpty()); Review Comment: Thank you! I've added these changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12230: MINOR: Catch InvocationTargetException explicitly and propagate underlying cause
divijvaidya commented on PR #12230: URL: https://github.com/apache/kafka/pull/12230#issuecomment-1156355538 @dengziming @showuon please review this small change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12229: MINOR: Include the inner exception stack trace when re-throwing an exception
divijvaidya commented on PR #12229: URL: https://github.com/apache/kafka/pull/12229#issuecomment-1156352282 @mimaison perhaps you may want to look into this? This already has 2 approvals from non-committers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13386) Foreign Key Join filtering out valid records after a code change / schema evolved
[ https://issues.apache.org/jira/browse/KAFKA-13386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554526#comment-17554526 ] Kin Siu commented on KAFKA-13386: - [~guozhang], I am not sure if what I faced is same as [~sduran], but I hit an issue while load testing my application which contains FK join, there can be no output until the very last Left hand side record being processed. Simplified test data in my load testing : || - || Key Fields || Value Fields || | Left hand side table | K1 | FK1, DF1, UpdateTs | | Right hand side table | FK1 | V1, V2 | It is a simple FK join of Left hand side table and Right hand side table on field "FK1", for each of the Left hand side update, I changed the DF1 and UpdateTs. When we increase the Left hand side table publishing rate, at some point when the value is high, application started not generating any output, until the last left hand side record being processed. Think of the case that I ran the test for 10 mins, we can end up with only receiving output on the last few seconds. And instead of having same number of join output as the Left hand side updates + Right hand side updates, we received a lot less. I belive that it is the same issue as described above, the "hash" comparison when processing the right hand side return is compared with latest Left hand side hash, while in my test data, the FK relation remain the same, the "hash" changed due to value of "DF1" and "UpdateTs" changed. > Foreign Key Join filtering out valid records after a code change / schema > evolved > - > > Key: KAFKA-13386 > URL: https://issues.apache.org/jira/browse/KAFKA-13386 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.2 >Reporter: Sergio Duran Vegas >Priority: Major > > The join optimization assumes the serializer is deterministic and invariant > across upgrades. So in case of changes this opimitzation will drop > invalid/intermediate records. In other situations we have relied on the same > property, for example when computing whether an update is a duplicate result > or not. > > The problem is that some serializers are sadly not deterministic. > > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java] > > {code:java} > //If this value doesn't match the current value from the original table, it > is stale and should be discarded. > if (java.util.Arrays.equals(messageHash, currentHash)) {{code} > > A solution for this problem would be that the comparison use foreign-key > reference itself instead of the whole message hash. > > The bug fix proposal is to be allow the user to choose between one method of > comparison or another (whole hash or Fk reference). This would fix the > problem of dropping valid records on certain cases and allow the user to also > choose the current optimized way of checking valid records and intermediate > results dropping. > > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] divijvaidya commented on pull request #12229: MINOR: Include the inner exception stack trace when re-throwing an exception
divijvaidya commented on PR #12229: URL: https://github.com/apache/kafka/pull/12229#issuecomment-1156349743 > Can this kind of problem be caught by spotbugs? manual checking is error prone. > Can this kind of problem be caught by spotbugs? manual checking is error prone. > Can this kind of problem be caught by spotbugs? manual checking is error prone. Agreed @dengziming but unfortunately spotbugs isn't catching such errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12229: MINOR: Include the inner exception stack trace when re-throwing an exception
divijvaidya commented on PR #12229: URL: https://github.com/apache/kafka/pull/12229#issuecomment-1156349744 > Can this kind of problem be caught by spotbugs? manual checking is error prone. > Can this kind of problem be caught by spotbugs? manual checking is error prone. > Can this kind of problem be caught by spotbugs? manual checking is error prone. Agreed @dengziming but unfortunately spotbugs isn't catching such errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897838652 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -34,35 +35,56 @@ */ public abstract class SampledStat implements MeasurableStat { -private double initialValue; +private final double initialValue; +/** + * Index of the latest stored sample. + */ private int current = 0; +/** + * Stores the recorded samples in a ring buffer. + */ protected List samples; public SampledStat(double initialValue) { this.initialValue = initialValue; this.samples = new ArrayList<>(2); } +/** + * {@inheritDoc} + * + * On every record, do the following: + * 1. Check if the current window has expired + * 2. If yes, then advance the current pointer to new window. The start time of the new window is set to nearest + *possible starting point for the new window. The nearest starting point occurs at config.timeWindowMs intervals + *from the end time of last known window. + * 3. Update the recorded value for the current window + * 4. Increase the number of event count + */ @Override -public void record(MetricConfig config, double value, long timeMs) { -Sample sample = current(timeMs); -if (sample.isComplete(timeMs, config)) -sample = advance(config, timeMs); -update(sample, config, value, timeMs); -sample.eventCount += 1; +public void record(MetricConfig config, double value, long recordingTimeMs) { +Sample sample = current(recordingTimeMs); +if (sample.isComplete(recordingTimeMs, config)) { +final long previousWindowStartTime = sample.lastWindowMs; +final long previousWindowEndtime = previousWindowStartTime + config.timeWindowMs(); +final long startTimeOfNewWindow = recordingTimeMs - ((recordingTimeMs - previousWindowEndtime) % config.timeWindowMs()); Review Comment: That is a great observation Tom! Ideally the code should be written to ensure that recording a metric should not block because the operation is wall clock time sensitive. But as you observed, we have `synchronized` at multiple places which may lead to sample being recorded in a window which has already completed in the past. For cases when the `sensor` is used for calculating the ConnectionQuota, this problem wouldn't occur because the calculation of `Time.milliseconds` is done inside a `synchronised` block which ensures that ensures that only one thread with latest timestamp will be accessing the sensor.record at a time. But I don't know about other code paths other than ConnectionQuota that use sensor and your observation is valid. Since this problem is independent of this code change, and breaks existing logic if/when recordingTimeMs < endTimeOfPreviousWindow, I have created a JIRA to address this in a separate PR: https://issues.apache.org/jira/browse/KAFKA-13994 [1] https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L1541-L1542 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13994) Incorrect quota calculation due to a bug
Divij Vaidya created KAFKA-13994: Summary: Incorrect quota calculation due to a bug Key: KAFKA-13994 URL: https://issues.apache.org/jira/browse/KAFKA-13994 Project: Kafka Issue Type: Bug Components: core Reporter: Divij Vaidya *Problem* This was noted by [~tombentley] at [https://github.com/apache/kafka/pull/12045#discussion_r895592286] The completion of a sample window in `SampledStat.java` is based on comparison of ` recordingTimeMs` with startTimeOfPreviousWindow [1]. `recordingTimeMs` is calculated as a System.currentTimeMillis which: 1. is not guaranteed to be monotonically increasing due to clock drifts. 2. is not necessarily the current time when it arrives at [1] because the thread may be blocked at `synchronized` at {{{}Sensor.recordInternal [2]{}}}, because synchronized provides no guarantee about fairness for blocked threads. Hence, it is possible that when isComplete comparison is made at [1], recordingTimeMs < endTimeOfCurrentWindow whereas the wallClockTimeAtTheMoment > startTimeOfCurrentWindow + window length. The implication of this would be: 1. The current sample window will not be considered completed even if it has completed as per wall clock time. 2. The value will be recorded in a sample window which has elapsed instead of a new window where it belongs. Due to the above two implications, the metrics captured by the sensor may not be correct which could lead to incorrect quota calculations. [1] [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java#L138] [2] https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java#L232 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] divijvaidya commented on pull request #12184: KAFKA-13911: Fix the rate window size calculation for edge cases
divijvaidya commented on PR #12184: URL: https://github.com/apache/kafka/pull/12184#issuecomment-1156329800 @dajac @guozhangwang please review when you get a chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
cadonna commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r897835619 ## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ## @@ -335,7 +333,39 @@ public void pauseResumehouldWorkAcrossInstances() throws Exception { kafkaStreams.resume(); waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); -awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); +awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA); +} + +@Test +public void pausedTopologyShouldNotRestoreStateStores() throws Exception { +produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + +kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams.start(); +kafkaStreams2.start(); + +waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), State.RUNNING, STARTUP_TIMEOUT); + +awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA); + +kafkaStreams.close(); +kafkaStreams2.close(); + +kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams.cleanUp(); +kafkaStreams2.cleanUp(); + +kafkaStreams.pause(); +kafkaStreams2.pause(); +kafkaStreams.start(); +kafkaStreams2.start(); + +waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), State.REBALANCING, STARTUP_TIMEOUT); + +assertTrue(kafkaStreams.allLocalStorePartitionLags().isEmpty()); +assertTrue(kafkaStreams2.allLocalStorePartitionLags().isEmpty()); Review Comment: You could do something like: ``` waitForApplicationState(Arrays.asList(kafkaStreams), State.REBALANCING, STARTUP_TIMEOUT); waitForCondition( () -> !kafkaStreams.allLocalStorePartitionLags().isEmpty(), "Lags for local store partitions were not found within the timeout!"); waitUntilStreamsHasPolled(kafkaStreams, 2); final long stateStoreLag1 = kafkaStreams.allLocalStorePartitionLags().get("test-store").get(0).offsetLag(); waitUntilStreamsHasPolled(kafkaStreams, 2); final long stateStoreLag2 = kafkaStreams.allLocalStorePartitionLags().get("test-store").get(0).offsetLag(); assertTrue(stateStoreLag1 > 0); assertEquals(stateStoreLag1, stateStoreLag2); ``` This code just considers one Streams client. You need to add `Materialized.as("test-store")` to the call to `count()` in your topology. As soon as you activated the standbys, you need to do the same for the second Streams client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a diff in pull request #11781: KAFKA-10000: Per-connector offsets topics (KIP-618)
tombentley commented on code in PR #11781: URL: https://github.com/apache/kafka/pull/11781#discussion_r897830233 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -274,18 +276,23 @@ public static NewTopicBuilder defineTopic(String topicName) { * @param adminConfig the configuration for the {@link Admin} */ public TopicAdmin(Map adminConfig) { -this(adminConfig, Admin.create(adminConfig)); +this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), Admin.create(adminConfig)); } -// visible for testing -TopicAdmin(Map adminConfig, Admin adminClient) { -this(adminConfig, adminClient, true); +/** + * Create a new topic admin using the provided {@link Admin} + * + * @param bootstrapServers the Kafka cluster targeted by the admin + * @param adminClient the {@link Admin} to use under the hood + */ +public TopicAdmin(Object bootstrapServers, Admin adminClient) { Review Comment: This constructor kinda confuses the ownership of the `Admin` client. I think things are cleaner when the TopicAdmin instantiates (and thus owns the `admin`). Note, it looks like there are no callers for `TopicAdmin.admin`. It seems that the call sites in `doBuild` could simply pass the map of configs (and the `bootstrapServers` looked up from that), rather than instantiating the `admin` and then passing it to the TopicAdmin. Obviously the test code has slightly different requirements, meaning we still need this constructor. I did also wonder whether we could also get rid of `bootstrapServers` by defining `toString` on `KafkaAdminClient` and using that for the logging and exceptions here in `TopicAdmin`. Perhaps that's worth a followup PR at some point, (though perhaps there are benefits to hiding bootstrap servers from receivers of clients). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
cadonna commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r897818350 ## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ## @@ -335,7 +333,39 @@ public void pauseResumehouldWorkAcrossInstances() throws Exception { kafkaStreams.resume(); waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); -awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); +awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA); +} + +@Test +public void pausedTopologyShouldNotRestoreStateStores() throws Exception { +produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + +kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams.start(); +kafkaStreams2.start(); + +waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), State.RUNNING, STARTUP_TIMEOUT); + +awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA); + +kafkaStreams.close(); +kafkaStreams2.close(); + +kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams.cleanUp(); +kafkaStreams2.cleanUp(); + +kafkaStreams.pause(); +kafkaStreams2.pause(); +kafkaStreams.start(); +kafkaStreams2.start(); + +waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), State.REBALANCING, STARTUP_TIMEOUT); + +assertTrue(kafkaStreams.allLocalStorePartitionLags().isEmpty()); +assertTrue(kafkaStreams2.allLocalStorePartitionLags().isEmpty()); Review Comment: I played a bit around with the test and indeed if you add a `Thread.sleep(2000)` before these asserts, the test fails because the returned map is not empty. That means, the assignment was not finished before the asserts were called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13993) Large log.cleaner.buffer.size config breaks Kafka Broker
[ https://issues.apache.org/jira/browse/KAFKA-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomohiro Hashidate updated KAFKA-13993: --- Description: LogCleaner builds a Cleaner instance in the following way. {color:#cc7832}val {color}{color:#9876aa}cleaner {color}= {color:#cc7832}new {color}Cleaner(id = threadId{color:#cc7832},{color}{color:#cc7832} {color}offsetMap = {color:#cc7832}new {color}SkimpyOffsetMap(memory = math.min({color:#9876aa}config{color}.dedupeBufferSize / {color:#9876aa}config{color}.numThreads{color:#cc7832}, Int{color}.{color:#9876aa}MaxValue{color}).toInt{color:#cc7832},{color}{color:#cc7832} {color}hashAlgorithm = {color:#9876aa}config{color}.hashAlgorithm){color:#cc7832},{color}{color:#cc7832} {color}ioBufferSize = {color:#9876aa}config{color}.ioBufferSize / {color:#9876aa}config{color}.numThreads / {color:#6897bb}2{color}{color:#cc7832},{color}{color:#cc7832} {color}maxIoBufferSize = {color:#9876aa}config{color}.maxMessageSize{color:#cc7832},{color}{color:#cc7832} {color}dupBufferLoadFactor = {color:#9876aa}config{color}.dedupeBufferLoadFactor{color:#cc7832},{color}{color:#cc7832} {color}throttler = {color:#9876aa}throttler{color}{color:#cc7832},{color}{color:#cc7832} {color}time = time{color:#cc7832},{color}{color:#cc7832} {color}checkDone = checkDone) If `log.cleaner.buffer.size` / `log.cleaner.threads` is larger than Int.MaxValue, SkimpyOffsetMap uses Int.MaxValue. And SkimpyOffsetMap tries to allocate ByteBuffer that has Int.MaxValue capacity. But, in the implementation of Hotspot VM, the maximum array size is Int.MaxValue - 5. According to ArraysSupport in OpenJDK, SOFT_MAX_ARRAY_LENGTH is Int.MaxValue - 8 (This is more safety). https://github.com/openjdk/jdk17u/blob/master/src/java.base/share/classes/jdk/internal/util/ArraysSupport.java#L589 If ByteBuffer capacity exceeds the maximum array length, Kafka Broker failed to start. ``` [2022-06-14 18:08:09,609] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at kafka.log.SkimpyOffsetMap.(OffsetMap.scala:45) at kafka.log.LogCleaner$CleanerThread.(LogCleaner.scala:300) at kafka.log.LogCleaner.$anonfun$startup$2(LogCleaner.scala:155) at kafka.log.LogCleaner.startup(LogCleaner.scala:154) at kafka.log.LogManager.startup(LogManager.scala:435) at kafka.server.KafkaServer.startup(KafkaServer.scala:291) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala) ``` I suggest using `Int.MaxValue - 8`instead of `Int.MaxValue`. was: LogCleaner builds a Cleaner instance in the following way. {color:#cc7832}val {color}{color:#9876aa}cleaner {color}= {color:#cc7832}new {color}Cleaner(id = threadId{color:#cc7832},{color}{color:#cc7832} {color}offsetMap = {color:#cc7832}new {color}SkimpyOffsetMap(memory = math.min({color:#9876aa}config{color}.dedupeBufferSize / {color:#9876aa}config{color}.numThreads{color:#cc7832}, Int{color}.{color:#9876aa}MaxValue{color}).toInt{color:#cc7832},{color}{color:#cc7832} {color}hashAlgorithm = {color:#9876aa}config{color}.hashAlgorithm){color:#cc7832},{color}{color:#cc7832} {color}ioBufferSize = {color:#9876aa}config{color}.ioBufferSize / {color:#9876aa}config{color}.numThreads / {color:#6897bb}2{color}{color:#cc7832},{color}{color:#cc7832} {color}maxIoBufferSize = {color:#9876aa}config{color}.maxMessageSize{color:#cc7832},{color}{color:#cc7832} {color}dupBufferLoadFactor = {color:#9876aa}config{color}.dedupeBufferLoadFactor{color:#cc7832},{color}{color:#cc7832} {color}throttler = {color:#9876aa}throttler{color}{color:#cc7832},{color}{color:#cc7832} {color}time = time{color:#cc7832},{color}{color:#cc7832} {color}checkDone = checkDone) If `log.cleaner.buffer.size` / `log.cleaner.threads` is larger than Int.MaxValue, SkimpyOffsetMap uses Int.MaxValue. And SkimpyOffsetMap tries to allocate ByteBuffer that has Int.MaxValue capacity. But, in the implementation of Hotspot VM, the maximum array size is Int.MaxValue - 5. According to ArraysSupport in OpenJDK, SOFT_MAX_ARRAY_LENGTH is Int.MaxValue - 8 (This is more safety). If ByteBuffer capacity exceeds the maximum array length, Kafka Broker failed to start. ``` [2022-06-14 18:08:09,609] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at
[GitHub] [kafka] tombentley commented on a diff in pull request #11781: KAFKA-10000: Per-connector offsets topics (KIP-618)
tombentley commented on code in PR #11781: URL: https://github.com/apache/kafka/pull/11781#discussion_r897812219 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1327,30 +1334,39 @@ public WorkerTask doBuild(Task task, connectorClientConfigOverridePolicy, kafkaClusterId); KafkaProducer producer = new KafkaProducer<>(producerProps); -TopicAdmin topicAdmin; +// Prepare to create a topic admin if the task requires one, but do not actually create an instance +// until/unless one is needed +final AtomicReference topicAdmin = new AtomicReference<>(); +final Supplier topicAdminCreator = () -> topicAdmin.updateAndGet(existingAdmin -> { +if (existingAdmin != null) { +return existingAdmin; +} +Map adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, +sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); +Admin adminClient = Admin.create(adminOverrides); +return new TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient); +}); + Map topicCreationGroups; if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) { topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); // Create a topic admin that the task can use for topic creation -Map adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, -sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); -topicAdmin = new TopicAdmin(adminOverrides); +topicAdminCreator.get(); Review Comment: Thanks! When you have time it would be great if you could rebase that PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13993) Large log.cleaner.buffer.size config breaks Kafka Broker
[ https://issues.apache.org/jira/browse/KAFKA-13993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomohiro Hashidate updated KAFKA-13993: --- Description: LogCleaner builds a Cleaner instance in the following way. {color:#cc7832}val {color}{color:#9876aa}cleaner {color}= {color:#cc7832}new {color}Cleaner(id = threadId{color:#cc7832},{color}{color:#cc7832} {color}offsetMap = {color:#cc7832}new {color}SkimpyOffsetMap(memory = math.min({color:#9876aa}config{color}.dedupeBufferSize / {color:#9876aa}config{color}.numThreads{color:#cc7832}, Int{color}.{color:#9876aa}MaxValue{color}).toInt{color:#cc7832},{color}{color:#cc7832} {color}hashAlgorithm = {color:#9876aa}config{color}.hashAlgorithm){color:#cc7832},{color}{color:#cc7832} {color}ioBufferSize = {color:#9876aa}config{color}.ioBufferSize / {color:#9876aa}config{color}.numThreads / {color:#6897bb}2{color}{color:#cc7832},{color}{color:#cc7832} {color}maxIoBufferSize = {color:#9876aa}config{color}.maxMessageSize{color:#cc7832},{color}{color:#cc7832} {color}dupBufferLoadFactor = {color:#9876aa}config{color}.dedupeBufferLoadFactor{color:#cc7832},{color}{color:#cc7832} {color}throttler = {color:#9876aa}throttler{color}{color:#cc7832},{color}{color:#cc7832} {color}time = time{color:#cc7832},{color}{color:#cc7832} {color}checkDone = checkDone) If `log.cleaner.buffer.size` / `log.cleaner.threads` is larger than Int.MaxValue, SkimpyOffsetMap uses Int.MaxValue. And SkimpyOffsetMap tries to allocate ByteBuffer that has Int.MaxValue capacity. But, in the implementation of Hotspot VM, the maximum array size is Int.MaxValue - 5. According to ArraysSupport in OpenJDK, SOFT_MAX_ARRAY_LENGTH is Int.MaxValue - 8 (This is more safety). If ByteBuffer capacity exceeds the maximum array length, Kafka Broker failed to start. ``` [2022-06-14 18:08:09,609] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at kafka.log.SkimpyOffsetMap.(OffsetMap.scala:45) at kafka.log.LogCleaner$CleanerThread.(LogCleaner.scala:300) at kafka.log.LogCleaner.$anonfun$startup$2(LogCleaner.scala:155) at kafka.log.LogCleaner.startup(LogCleaner.scala:154) at kafka.log.LogManager.startup(LogManager.scala:435) at kafka.server.KafkaServer.startup(KafkaServer.scala:291) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala) ``` I suggest using `Int.MaxValue - 8`instead of `Int.MaxValue`. was: LogCleaner build a Cleaner instance with following way. ``` {color:#cc7832}val {color}{color:#9876aa}cleaner {color}= {color:#cc7832}new {color}Cleaner(id = threadId{color:#cc7832}, {color}{color:#cc7832} {color}offsetMap = {color:#cc7832}new {color}SkimpyOffsetMap(memory = math.min({color:#9876aa}config{color}.dedupeBufferSize / {color:#9876aa}config{color}.numThreads{color:#cc7832}, Int{color}.{color:#9876aa}MaxValue{color}).toInt{color:#cc7832}, {color}{color:#cc7832} {color}hashAlgorithm = {color:#9876aa}config{color}.hashAlgorithm){color:#cc7832}, {color}{color:#cc7832} {color}ioBufferSize = {color:#9876aa}config{color}.ioBufferSize / {color:#9876aa}config{color}.numThreads / {color:#6897bb}2{color}{color:#cc7832}, {color}{color:#cc7832} {color}maxIoBufferSize = {color:#9876aa}config{color}.maxMessageSize{color:#cc7832}, {color}{color:#cc7832} {color}dupBufferLoadFactor = {color:#9876aa}config{color}.dedupeBufferLoadFactor{color:#cc7832}, {color}{color:#cc7832} {color}throttler = {color:#9876aa}throttler{color}{color:#cc7832}, {color}{color:#cc7832} {color}time = time{color:#cc7832}, {color}{color:#cc7832} {color}checkDone = checkDone) ``` If `log.cleaner.buffer.size` / `log.cleaner.threads` is larger than Int.MaxValue, SkimpyOffsetMap uses Int.MaxValue. And SkimpyOffsetMap try to allocates ByteBuffer that has Int.MaxValue capacity. But, in the implmentation of Hotspot VM, the maximum array size is Int.MaxValue - 5. Accoring to ArraysSupport in OpenJDK, SOFT_MAX_ARRAY_LENGTH is Int.MaxValue - 8 (This is more safety). If ByteBuffer capacity exceeds the maximum array length, Kafka Broker failed to start. ``` [2022-06-14 18:08:09,609] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at kafka.log.SkimpyOffsetMap.(OffsetMap.scala:45) at kafka.log.LogCleaner$CleanerThread.(LogCleaner.scala:300)
[jira] [Created] (KAFKA-13993) Large log.cleaner.buffer.size config breaks Kafka Broker
Tomohiro Hashidate created KAFKA-13993: -- Summary: Large log.cleaner.buffer.size config breaks Kafka Broker Key: KAFKA-13993 URL: https://issues.apache.org/jira/browse/KAFKA-13993 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.1.1, 3.2.0, 3.0.1, 2.8.1, 2.7.2 Reporter: Tomohiro Hashidate LogCleaner build a Cleaner instance with following way. ``` {color:#cc7832}val {color}{color:#9876aa}cleaner {color}= {color:#cc7832}new {color}Cleaner(id = threadId{color:#cc7832}, {color}{color:#cc7832} {color}offsetMap = {color:#cc7832}new {color}SkimpyOffsetMap(memory = math.min({color:#9876aa}config{color}.dedupeBufferSize / {color:#9876aa}config{color}.numThreads{color:#cc7832}, Int{color}.{color:#9876aa}MaxValue{color}).toInt{color:#cc7832}, {color}{color:#cc7832} {color}hashAlgorithm = {color:#9876aa}config{color}.hashAlgorithm){color:#cc7832}, {color}{color:#cc7832} {color}ioBufferSize = {color:#9876aa}config{color}.ioBufferSize / {color:#9876aa}config{color}.numThreads / {color:#6897bb}2{color}{color:#cc7832}, {color}{color:#cc7832} {color}maxIoBufferSize = {color:#9876aa}config{color}.maxMessageSize{color:#cc7832}, {color}{color:#cc7832} {color}dupBufferLoadFactor = {color:#9876aa}config{color}.dedupeBufferLoadFactor{color:#cc7832}, {color}{color:#cc7832} {color}throttler = {color:#9876aa}throttler{color}{color:#cc7832}, {color}{color:#cc7832} {color}time = time{color:#cc7832}, {color}{color:#cc7832} {color}checkDone = checkDone) ``` If `log.cleaner.buffer.size` / `log.cleaner.threads` is larger than Int.MaxValue, SkimpyOffsetMap uses Int.MaxValue. And SkimpyOffsetMap try to allocates ByteBuffer that has Int.MaxValue capacity. But, in the implmentation of Hotspot VM, the maximum array size is Int.MaxValue - 5. Accoring to ArraysSupport in OpenJDK, SOFT_MAX_ARRAY_LENGTH is Int.MaxValue - 8 (This is more safety). If ByteBuffer capacity exceeds the maximum array length, Kafka Broker failed to start. ``` [2022-06-14 18:08:09,609] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at kafka.log.SkimpyOffsetMap.(OffsetMap.scala:45) at kafka.log.LogCleaner$CleanerThread.(LogCleaner.scala:300) at kafka.log.LogCleaner.$anonfun$startup$2(LogCleaner.scala:155) at kafka.log.LogCleaner.startup(LogCleaner.scala:154) at kafka.log.LogManager.startup(LogManager.scala:435) at kafka.server.KafkaServer.startup(KafkaServer.scala:291) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala) ``` I suggest to use `Int.MaxValue - 8`instead of `Int.MaxValue`. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] cadonna commented on a diff in pull request #12161: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies
cadonna commented on code in PR #12161: URL: https://github.com/apache/kafka/pull/12161#discussion_r897789473 ## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ## @@ -335,7 +333,39 @@ public void pauseResumehouldWorkAcrossInstances() throws Exception { kafkaStreams.resume(); waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); -awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); +awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA); +} + +@Test +public void pausedTopologyShouldNotRestoreStateStores() throws Exception { +produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + +kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams.start(); +kafkaStreams2.start(); + +waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), State.RUNNING, STARTUP_TIMEOUT); + +awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA); + +kafkaStreams.close(); +kafkaStreams2.close(); + +kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams.cleanUp(); +kafkaStreams2.cleanUp(); + +kafkaStreams.pause(); +kafkaStreams2.pause(); +kafkaStreams.start(); +kafkaStreams2.start(); + +waitForApplicationState(Arrays.asList(kafkaStreams, kafkaStreams2), State.REBALANCING, STARTUP_TIMEOUT); + +assertTrue(kafkaStreams.allLocalStorePartitionLags().isEmpty()); +assertTrue(kafkaStreams2.allLocalStorePartitionLags().isEmpty()); Review Comment: Why are you verifying for emptiness? I would expect that there are entries for the state stores with a lag greater than 0. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -479,6 +485,47 @@ public void restore(final Map tasks) { } } +private void updateStandbyPartitions(final Map tasks, Review Comment: Do not forget to rename this method to something more meaningful. Proposal: `pauseResumePartitions()` ## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ## @@ -335,7 +333,39 @@ public void pauseResumehouldWorkAcrossInstances() throws Exception { kafkaStreams.resume(); waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); -awaitOutput(OUTPUT_STREAM_1, 3, COUNT_OUTPUT_DATA); +awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA); +} + +@Test +public void pausedTopologyShouldNotRestoreStateStores() throws Exception { +produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); + +kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); +kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_1); Review Comment: If you do not use standby tasks, there is no reason to use two Kafka Streams clients. I would propose to use one standby only for this test. For that you need to set `num.standby.replicas` to 1. That has the effect that one client gets the active store assigned and the other gets the standby store assigned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16
mdedetrich commented on PR #12284: URL: https://github.com/apache/kafka/pull/12284#issuecomment-1156251953 @divijvaidya Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12284: KAFKA-13980: Upgrade from Scala 2.12.15 to 2.12.16
divijvaidya commented on PR #12284: URL: https://github.com/apache/kafka/pull/12284#issuecomment-1156237309 @mdedetrich could you please re-run the tests (by pushing another commit or by rebasing from trunk & force pushing). It would be ideal if we have a clean test run (with known flaky failures) before we approve this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897722547 ## clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java: ## @@ -608,14 +609,14 @@ public void testRateWindowing() throws Exception { time.sleep(cfg.timeWindowMs() / 2); // prior to any time passing -double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0; +double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + (((double) cfg.timeWindowMs()) / 2.0d)) / 1000.0d; Review Comment: Thanks for catching this. I have fixed this in the latest revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897733191 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -34,35 +35,56 @@ */ public abstract class SampledStat implements MeasurableStat { -private double initialValue; +private final double initialValue; +/** + * Index of the latest stored sample. + */ private int current = 0; +/** + * Stores the recorded samples in a ring buffer. Review Comment: That sounds fair. I have fixed the java doc in the latest revision as per your suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897733191 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -34,35 +35,56 @@ */ public abstract class SampledStat implements MeasurableStat { -private double initialValue; +private final double initialValue; +/** + * Index of the latest stored sample. + */ private int current = 0; +/** + * Stores the recorded samples in a ring buffer. Review Comment: That sounds fair. I have fixed the java doc in the latest revision and replaced with the following ``` /** * Stores the recorded samples. * Note that the previously recorded samples may be overwritten/reset if they are considered obsolete by the * {@link Sample#purgeObsoleteSamples} function. */ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897724648 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -84,13 +106,7 @@ public Sample current(long timeMs) { public Sample oldest(long now) { if (samples.size() == 0) this.samples.add(newSample(now)); -Sample oldest = this.samples.get(0); -for (int i = 1; i < this.samples.size(); i++) { -Sample curr = this.samples.get(i); -if (curr.lastWindowMs < oldest.lastWindowMs) -oldest = curr; -} -return oldest; +return samples.stream().min(Comparator.comparingLong(s -> s.lastWindowMs)).orElse(samples.get(0)); Review Comment: I find the new code more readable since we can immediately eye ball that a min is being calculated vs. in the previous version where we have to understand the assignments and logic in for loop to determine what is going on. Nevertheless, I don't have strong opinion on this one. If you still think we need to revert it back, I will do it. Let me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897724922 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java: ## @@ -68,24 +68,55 @@ public double measure(MetricConfig config, long now) { } public long windowSize(MetricConfig config, long now) { -// purge old samples before we compute the window size +// Purge obsolete samples. Obsolete samples are the ones which are not relevant to the current calculation +// because their creation time is outside (before) the duration of time window used to calculate rate. stat.purgeObsoleteSamples(config, now); /* * Here we check the total amount of time elapsed since the oldest non-obsolete window. - * This give the total windowSize of the batch which is the time used for Rate computation. - * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second - * window, the measured rate will be very high. - * Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete. + * This gives the duration of computation time window which used to calculate Rate. Review Comment: Thanks for catching this. I have fixed this in the latest revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897721779 ## clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java: ## @@ -149,13 +149,14 @@ private void verifyStats(Function metricValueFunc) { assertEquals(5.0, metricValueFunc.apply(metrics.metric(metrics.metricName("s2.total", "grp1"))), EPS, "s2 reflects the constant value"); -assertEquals(4.5, metricValueFunc.apply(metrics.metric(metrics.metricName("test.avg", "grp1"))), EPS, +assertEquals(sum / (double) count, metricValueFunc.apply(metrics.metric(metrics.metricName("test.avg", "grp1"))), EPS, "Avg(0...9) = 4.5"); assertEquals(count - 1, metricValueFunc.apply(metrics.metric(metrics.metricName("test.max", "grp1"))), EPS, "Max(0...9) = 9"); assertEquals(0.0, metricValueFunc.apply(metrics.metric(metrics.metricName("test.min", "grp1"))), EPS, "Min(0...9) = 0"); -assertEquals(sum / elapsedSecs, metricValueFunc.apply(metrics.metric(metrics.metricName("test.rate", "grp1"))), EPS, +// rate is calculated over the first ever window. Hence, we assume presence of prior windows with 0 recorded events. +assertEquals((double) sum / elapsedSecs, metricValueFunc.apply(metrics.metric(metrics.metricName("test.rate", "grp1"))), EPS, Review Comment: Thanks for catching this. I have fixed this in the latest revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fred-ro commented on pull request #12201: MINOR: Replace left single quote with single quote in Connect worker's log message
fred-ro commented on PR #12201: URL: https://github.com/apache/kafka/pull/12201#issuecomment-1156048902 Is it possible to back ported it to branch 2.8 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tkaszuba commented on a diff in pull request #12293: KAFKA-13963: Clarified java doc for processors api
tkaszuba commented on code in PR #12293: URL: https://github.com/apache/kafka/pull/12293#discussion_r897565836 ## streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java: ## @@ -30,6 +30,7 @@ * In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one * sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology * {@link Topology#addSource(String, String...) reads} from the same topic. + * Processors and Transformers created with the Processor API are treated as black boxes and are not represented in the topology graph. Review Comment: that is correct, the issue is with context.forward -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org