[GitHub] [kafka] socutes commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException
socutes commented on a change in pull request #10749: URL: https://github.com/apache/kafka/pull/10749#discussion_r638494789 ## File path: raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java ## @@ -91,14 +92,22 @@ private QuorumStateData readStateFromFile(File file) throws IOException { final short dataVersion = dataVersionNode.shortValue(); return QuorumStateDataJsonConverter.read(dataObject, dataVersion); +} catch (IOException e) { +throw new UncheckedIOException( +String.format( +"Read the Quorum status exception from the file %s", +file +), +e Review comment: Thank you for your review. It has been modified, please Review it again ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] socutes commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException
socutes commented on a change in pull request #10749: URL: https://github.com/apache/kafka/pull/10749#discussion_r638499447 ## File path: raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java ## @@ -67,7 +68,7 @@ public FileBasedStateStore(final File stateFile) { this.stateFile = stateFile; } -private QuorumStateData readStateFromFile(File file) throws IOException { +private QuorumStateData readStateFromFile(File file) { Review comment: The exception has already been caught and handled. Do you need to throw it up? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] socutes commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException
socutes commented on a change in pull request #10749: URL: https://github.com/apache/kafka/pull/10749#discussion_r638494789 ## File path: raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java ## @@ -91,14 +92,22 @@ private QuorumStateData readStateFromFile(File file) throws IOException { final short dataVersion = dataVersionNode.shortValue(); return QuorumStateDataJsonConverter.read(dataObject, dataVersion); +} catch (IOException e) { +throw new UncheckedIOException( +String.format( +"Read the Quorum status exception from the file %s", +file +), +e Review comment: Thank you for your review. Let me revise it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] KahnCheny commented on pull request #10746: MINOR: remove unneccessary public keyword from ProducerInterceptor/ConsumerInterceptor interface
KahnCheny commented on pull request #10746: URL: https://github.com/apache/kafka/pull/10746#issuecomment-847563816 Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12430) emit.heartbeats.enabled = false should disable heartbeats topic creation
[ https://issues.apache.org/jira/browse/KAFKA-12430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Yurchenko reassigned KAFKA-12430: -- Assignee: (was: Ivan Yurchenko) > emit.heartbeats.enabled = false should disable heartbeats topic creation > > > Key: KAFKA-12430 > URL: https://issues.apache.org/jira/browse/KAFKA-12430 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Ivan Yurchenko >Priority: Minor > > Currently, MirrorMaker 2's {{MirrorHeartbeatConnector}} emits heartbeats or > not based on {{emit.heartbeats.enabled}} setting. However, {{heartbeats}} > topic is created unconditionally. It seems that the same setting should > really disable the topic creation as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12430) emit.heartbeats.enabled = false should disable heartbeats topic creation
[ https://issues.apache.org/jira/browse/KAFKA-12430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Yurchenko updated KAFKA-12430: --- Description: Currently, whether MirrorMaker 2's {{MirrorHeartbeatConnector}} emits heartbeats or not is based on {{emit.heartbeats.enabled}} setting. However, {{heartbeats}} topic is created unconditionally. It seems that the same setting should really disable the topic creation as well. (was: Currently, MirrorMaker 2's {{MirrorHeartbeatConnector}} emits heartbeats or not based on {{emit.heartbeats.enabled}} setting. However, {{heartbeats}} topic is created unconditionally. It seems that the same setting should really disable the topic creation as well.) > emit.heartbeats.enabled = false should disable heartbeats topic creation > > > Key: KAFKA-12430 > URL: https://issues.apache.org/jira/browse/KAFKA-12430 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Ivan Yurchenko >Priority: Minor > > Currently, whether MirrorMaker 2's {{MirrorHeartbeatConnector}} emits > heartbeats or not is based on {{emit.heartbeats.enabled}} setting. However, > {{heartbeats}} topic is created unconditionally. It seems that the same > setting should really disable the topic creation as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vahidhashemian commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
vahidhashemian commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r638437845 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -637,19 +709,27 @@ private void assignPartition(TopicPartition partition, } } -private boolean canParticipateInReassignment(TopicPartition partition, - Map> partition2AllPotentialConsumers) { +private boolean canParticipateInReassignment(String topic, + Map> topic2AllPotentialConsumers) { // if a partition has two or more potential consumers it is subject to reassignment. Review comment: Comment needs an update. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -637,19 +709,27 @@ private void assignPartition(TopicPartition partition, } } -private boolean canParticipateInReassignment(TopicPartition partition, - Map> partition2AllPotentialConsumers) { +private boolean canParticipateInReassignment(String topic, + Map> topic2AllPotentialConsumers) { // if a partition has two or more potential consumers it is subject to reassignment. -return partition2AllPotentialConsumers.get(partition).size() >= 2; +return topic2AllPotentialConsumers.get(topic).size() >= 2; } private boolean canParticipateInReassignment(String consumer, Map> currentAssignment, - Map> consumer2AllPotentialPartitions, - Map> partition2AllPotentialConsumers) { + Map> consumer2AllPotentialTopics, + Map> topic2AllPotentialConsumers, + Map partitionsPerTopic, + int totalPartitionCount) { List currentPartitions = currentAssignment.get(consumer); int currentAssignmentSize = currentPartitions.size(); -int maxAssignmentSize = consumer2AllPotentialPartitions.get(consumer).size(); +List allSubscribedTopics = consumer2AllPotentialTopics.get(consumer); +int maxAssignmentSize; +if (allSubscribedTopics.size() == partitionsPerTopic.size()) { +maxAssignmentSize = totalPartitionCount; +} else { +maxAssignmentSize = allSubscribedTopics.stream().map(topic -> partitionsPerTopic.get(topic)).reduce(0, Integer::sum); +} Review comment: The same code block appears in lines 638-644. Is it possible to somehow factor it out? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -469,73 +426,190 @@ private boolean allSubscriptionsEqual(Set allTopics, TreeSet sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment)); sortedCurrentSubscriptions.addAll(currentAssignment.keySet()); -balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, -consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer, revocationRequired); +balance(currentAssignment, prevAssignment, sortedAllPartitions, unassignedPartitions, sortedCurrentSubscriptions, +consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, revocationRequired, +partitionsPerTopic, totalPartitionsCount); + +if (log.isDebugEnabled()) { +log.debug("final assignment: {}", currentAssignment); +} + return currentAssignment; } +/** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and sortedAssignedPartitions. If no assigned partitions, we'll just return all sorted topic partitions. + * This is used in generalAssign method + * + * We loop the sortedPartition, and compare the ith element in sortedAssignedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedAssignedPartitions + * + * @param sortedAllPartitions: sorted all partitions + * @param sortedAssignedPartitions: sorted partitions, all are included in the sortedPartitions + * @param topic2AllPotentialConsumers: topics mapped to all consumers that subscribed to it + * @return partitions that aren't as
[GitHub] [kafka] KahnCheny commented on pull request #10746: MINOR: remove unneccessary public keyword from ProducerInterceptor/ConsumerInterceptor interface
KahnCheny commented on pull request #10746: URL: https://github.com/apache/kafka/pull/10746#issuecomment-847490202 > @KahnCheny , thanks for the PR. LGTM! Could you also remove the `public` keyword in `ConsumerInterceptor` ? Thank you. Of course. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor, replace TaskId#parse and add tests
showuon commented on a change in pull request #10755: URL: https://github.com/apache/kafka/pull/10755#discussion_r638426531 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java ## @@ -40,6 +40,18 @@ private final Optional timeCurrentIdlingStarted; +/** + * @deprecated since 3.0, please use the constructor that accepts a TaskId object instead of a String Review comment: nit: It should be more clear to put a constructor link here. ex: ``` @deprecated since 3.0, please use {@link #TaskMetadata(TaskId, Set, Map, Map, Optional)} instead ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException
showuon commented on a change in pull request #10749: URL: https://github.com/apache/kafka/pull/10749#discussion_r638422416 ## File path: raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java ## @@ -91,14 +92,22 @@ private QuorumStateData readStateFromFile(File file) throws IOException { final short dataVersion = dataVersionNode.shortValue(); return QuorumStateDataJsonConverter.read(dataObject, dataVersion); +} catch (IOException e) { +throw new UncheckedIOException( +String.format( +"Read the Quorum status exception from the file %s", +file +), +e Review comment: Also, could you re-phrase the error message, ex: `Error while reading the Quorum status from the file` Same comments to the other places. ## File path: raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java ## @@ -91,14 +92,22 @@ private QuorumStateData readStateFromFile(File file) throws IOException { final short dataVersion = dataVersionNode.shortValue(); return QuorumStateDataJsonConverter.read(dataObject, dataVersion); +} catch (IOException e) { +throw new UncheckedIOException( +String.format( +"Read the Quorum status exception from the file %s", +file +), +e Review comment: The indent is not consistent with others. Also, should we break the exception into 6 lines? I think this could be enough: ```java throw new UncheckedIOException( String.format("Read the Quorum status exception from the file %s", file), e); ``` Same comments to other places. ## File path: raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java ## @@ -67,7 +68,7 @@ public FileBasedStateStore(final File stateFile) { this.stateFile = stateFile; } -private QuorumStateData readStateFromFile(File file) throws IOException { +private QuorumStateData readStateFromFile(File file) { Review comment: Should we replace `throws IOException` with `throws UncheckedIOException` here? Same comments to other places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10755: MINOR: deprecate TaskMetadata constructor, replace TaskId#parse and add tests
ableegoldman commented on pull request #10755: URL: https://github.com/apache/kafka/pull/10755#issuecomment-847495713 All tests passed except for unrelated flaky `kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testRemovingWorker()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12845) Rollback change which requires join key to be non null on KStream->GlobalKTable
Pedro Gontijo created KAFKA-12845: - Summary: Rollback change which requires join key to be non null on KStream->GlobalKTable Key: KAFKA-12845 URL: https://issues.apache.org/jira/browse/KAFKA-12845 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.7.0 Reporter: Pedro Gontijo As part of [KAFKA-10277|https://issues.apache.org/jira/browse/KAFKA-10277] the behavior for KStream->GlobalKtable joins was changed to require non null join keys. But it seems reasonable that not every record will have an existing relationship (and hence a key) with the join globalktable. Think about a User>Car for instance, or PageView>Product. An empty/zero key could be returned by the KeyMapper but that will make a totally unnecessary search into the store. I do not think that makes sense for any GlobalKtable join (inner or left) but for left join it sounds even more strange. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #10609: KAFKA-12648: Pt. 1 - Add NamedTopology to protocol and state directory structure
ableegoldman commented on a change in pull request #10609: URL: https://github.com/apache/kafka/pull/10609#discussion_r638396898 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -387,18 +422,18 @@ public synchronized void cleanRemovedTasks(final long cleanupDelayMs) { } private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { -for (final File taskDir : listNonEmptyTaskDirectories()) { Review comment: Just want to call this out since it's a change in behavior unrelated to this PR -- actually just something we could/should have cleaned up after removing the lock/file based locking. Previously we couldn't ever delete empty task dirs by cleaner thread (due to that Windows bug), now we can, so we should not exclude empty dirs here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman edited a comment on pull request #10609: KAFKA-12648: Pt. 1 - Add NamedTopology to protocol and state directory structure
ableegoldman edited a comment on pull request #10609: URL: https://github.com/apache/kafka/pull/10609#issuecomment-845638871 Rebased after the TaskId changes in KIP-470, and responded to all comments. Not much has changed since the last review, just cleaning up here and there. ~It's pretty much done except for StateDirectoryTest, which I can always do in a quick followup PR to unblock other downstream work with this~ edit: tests are done, this PR is fully ready for review and merge -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sasukerui closed pull request #10751: MINOR: update java doc for ConsumerCoordinator
sasukerui closed pull request #10751: URL: https://github.com/apache/kafka/pull/10751 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10755: MINOR: deprecate TaskMetadata constructor, replace TaskId#parse and add tests
ableegoldman commented on pull request #10755: URL: https://github.com/apache/kafka/pull/10755#issuecomment-847442114 cc @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer
ableegoldman merged pull request #10690: URL: https://github.com/apache/kafka/pull/10690 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12835) Topic IDs can mismatch on brokers (after interbroker protocol version update)
[ https://issues.apache.org/jira/browse/KAFKA-12835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350684#comment-17350684 ] Justine Olshan commented on KAFKA-12835: Hi [~ivanyu]. Previously we could lose topic IDs in the Znode when reassigning partitions. This could occur if we switched between controllers with IBP 2.8 back to 2.7 (where we reassign partitions) and back to 2.8. If the partition.metadata file still existed, we would have the old ID in that, but the new controller would see a topic ID missing in the ZNode and assign a new one. I've opened a PR to prevent this loss of topic ID regardless of the IBP of the controller. > Topic IDs can mismatch on brokers (after interbroker protocol version update) > - > > Key: KAFKA-12835 > URL: https://issues.apache.org/jira/browse/KAFKA-12835 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: Ivan Yurchenko >Assignee: Justine Olshan >Priority: Major > > We had a Kafka cluster running 2.8 version with interbroker protocol set to > 2.7. It had a number of topics and everything was fine. > Then we decided to update the interbroker protocol to 2.8 by the following > procedure: > 1. Run new brokers with the interbroker protocol set to 2.8. > 2. Move the data from the old brokers to the new ones (normal partition > reassignment API). > 3. Decommission the old brokers. > At the stage 2 we had the problem: old brokers started failing on > {{LeaderAndIsrRequest}} handling with > {code:java} > ERROR [Broker id=<...>] Topic Id in memory: <...> does not match the topic Id > for partition <...> provided in the request: <...>. (state.change.logger) > {code} > for multiple topics. Topics were not recreated. > We checked {{partition.metadata}} files and IDs there were indeed different > from the values in ZooKeeper. It was fixed by deleting the metadata files > (and letting them be recreated). > > The logs, unfortunately, didn't show anything that might point to the cause > of the issue (or it happened longer ago than we store the logs). > We tried to reproduce this also, but no success. > If the community can point out what to check or beware of in future, it will > be great. We'll be happy to provide additional information if needed. Thank > you! > Sorry for the ticket that might be not very actionable. We hope to at least > rise awareness of this issue. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman edited a comment on pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer
ableegoldman edited a comment on pull request #10690: URL: https://github.com/apache/kafka/pull/10690#issuecomment-847413919 Some unrelated test failures in `RaftClusterTest`, `connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector()`, and `kafka.api.PlaintextConsumerTest.testPartitionsFor()` Will merge to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer
ableegoldman commented on pull request #10690: URL: https://github.com/apache/kafka/pull/10690#issuecomment-847413919 Some unrelated test failures in `RaftClusterTest`, `connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector()`, and `kafka.api.PlaintextConsumerTest.testPartitionsFor()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma closed pull request #10497: KAFKA-12342; Merge RaftClient and MetaLogManager interfaces and remove shim
ijuma closed pull request #10497: URL: https://github.com/apache/kafka/pull/10497 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor, replace TaskId#parse and add tests
ableegoldman commented on a change in pull request #10755: URL: https://github.com/apache/kafka/pull/10755#discussion_r638336725 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java ## @@ -80,6 +83,35 @@ public String toString() { return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition; } +/** + * @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId} + */ +public static TaskId parse(final String taskIdStr) { Review comment: I noticed this has been (re)moved since 2.8 so I put it back with updates to handle named topologies, plus tests which it did not seem to have -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #10755: MINOR: deprecate TaskMetadata constructor, replace TaskId#parse and add tests
ableegoldman opened a new pull request #10755: URL: https://github.com/apache/kafka/pull/10755 Quick followup to KIP-740. I also noticed the TaskId#parse method had been modified previously, and should be re-added to the public TaskId class. It also had no tests, so now it does -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12844) KIP-740 follow up: clean up TaskId
A. Sophie Blee-Goldman created KAFKA-12844: -- Summary: KIP-740 follow up: clean up TaskId Key: KAFKA-12844 URL: https://issues.apache.org/jira/browse/KAFKA-12844 Project: Kafka Issue Type: Task Components: streams Reporter: A. Sophie Blee-Goldman Fix For: 4.0.0 See [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557] – for the TaskId class, we need to remove the following deprecated APIs: # The public partition and topicGroupId fields should be "removed", ie made private (can also now rename topicGroupId to subtopology to match the getter) # The two #readFrom and two #writeTo methods can be removed (they have already been converted to internal utility methods we now use instead, so just remove them) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata
A. Sophie Blee-Goldman created KAFKA-12843: -- Summary: KIP-740 follow up: clean up TaskMetadata Key: KAFKA-12843 URL: https://issues.apache.org/jira/browse/KAFKA-12843 Project: Kafka Issue Type: Task Components: streams Reporter: A. Sophie Blee-Goldman Fix For: 4.0.0 See [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557] – for the TaskMetadata class, we need to: # Deprecate the TaskMetadata#getTaskId method # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() API that returns a TaskId instead of a String # Remove the deprecated constructor -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12838) Kafka Broker - Request threads inefficiently blocking during produce
[ https://issues.apache.org/jira/browse/KAFKA-12838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350654#comment-17350654 ] Ryan Cabral commented on KAFKA-12838: - Yes, increasing the number of partitions can help reduce contention, but won't eliminate it. It is similar to increasing the number of request / io threads where it will help to mitigate the issue a bit. The core of the problem though is that requests for the same partition are dispatched simultaneously, tying up a request / io thread causing requests for other partitions to wait behind them, even though they depend on a different lock. That means just two producers to the same partition can have some sort of performance impact to an entire broker's overall throughput rather than just the partition's throughput. > Kafka Broker - Request threads inefficiently blocking during produce > > > Key: KAFKA-12838 > URL: https://issues.apache.org/jira/browse/KAFKA-12838 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.7.0, 2.8.0 >Reporter: Ryan Cabral >Priority: Major > > Hello, I have been using Kafka brokers for a bit and have run into a problem > with the way a kafka broker handles produce requests. If there are multiple > producers to the same topic and partition, any request handler threads > handling the produce for that topic and partition become blocked until all > requests before it are done. Request handler threads for the entire broker > can become exhausted waiting on the same partition lock, blocking requests > for other partitions that would not have needed the same lock. > Once that starts happening, requests start to back up, queued requests can > reach its maximum and network threads begin to be paused cascading the > problem a bit more. Overall performance ends up being degraded. I'm not so > focused on the cascade at the moment as I am the initial contention. > Intuitively I would expect locking contention on a single partition to ONLY > affect throughput on that partition and not the entire broker. > > The append call within the request handler originates here: > [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/KafkaApis.scala#L638] > Further down the stack the lock during append is created here: > [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/log/Log.scala#L1165] > At this point the first request will hold the lock during append and future > requests on the same partition will block, waiting for the lock, tying up an > io thread (request handler). > At first glance, it seems like it would make the most sense to (via config?) > be able to funnel (produce) requests for the same partition through its own > request queue of sorts and dispatch them such that at most one io thread is > tied up at a time for a given partition. There are a number of reasons the > lock could be held elsewhere too but this should at least help mitigate the > issue a bit. I'm assuming this is easier said than done though and likely > requires significant refactoring to properly achieve but hoping this is > something that could end up on some sort of long term roadmap. > > Snippet from jstack. Almost all request handlers threads (there are 256 of > them, up from 25 to mitigate the issue) in the jstack are blocked waiting on > the same lock due to the number of producers we have. > > {noformat} > "data-plane-kafka-request-handler-254" #335 daemon prio=5 os_prio=0 > tid=0x7fb1c9f13000 nid=0x53f1 runnable [0x7fad35796000] > java.lang.Thread.State: RUNNABLE > at > org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.(KafkaLZ4BlockOutputStream.java:82) > at > org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.(KafkaLZ4BlockOutputStream.java:125) > at > org.apache.kafka.common.record.CompressionType$4.wrapForOutput(CompressionType.java:101) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.(MemoryRecordsBuilder.java:134) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.(MemoryRecordsBuilder.java:170) > at > org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:508) > at > kafka.log.LogValidator$.buildRecordsAndAssignOffsets(LogValidator.scala:500) > at > kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:455) > at > kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:106) > at kafka.log.Log.$anonfun$append$2(Log.scala:1126) > - locked <0x0004c9a6fd60> (a java.lang.Object) > at kafka.log.Log.append(Log.scala:2387) > at kafka.log.Log.appendAsLeader(Log.scala:1050) > at > kafka.cluster.Partitio
[GitHub] [kafka] jolshan commented on pull request #10754: KAFKA-12835: Topic IDs can mismatch on brokers (after interbroker protocol version update)
jolshan commented on pull request #10754: URL: https://github.com/apache/kafka/pull/10754#issuecomment-847389546 Hmm looks like `KafkaMetadataLogTest.testTopicId` is failing because we set `keepPartitionMetadataFile` to be false. When I ensure that we only assign topicId when `keepPartitionMetadataFile` we do not assign topic ID to the log. Since we rely on assignment in memory + in the file to be consistent, one option is to write a partition.metadata file for the metadata topic. This won't be used like the other partition.metadata files, but it might be easier to keep all logs consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350650#comment-17350650 ] A. Sophie Blee-Goldman commented on KAFKA-9168: --- Yep, all the rocksdb work waiting on more recent versions has been unblocked. Disclaimer: I don't have much context on this particular task or how, or even whether, it's something we can do for current features. It may be that the direct buffers are only useful for new features that we were considering, and not anything in the current codebase. But I really have no idea – just wanted to clarify that figuring this out is I guess the first part of this ticket :) > Integrate JNI direct buffer support to RocksDBStore > --- > > Key: KAFKA-9168 > URL: https://issues.apache.org/jira/browse/KAFKA-9168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: perfomance > > There has been a PR created on rocksdb Java client to support direct > ByteBuffers in Java. We can look at integrating it whenever it gets merged. > Link to PR: [https://github.com/facebook/rocksdb/pull/2283] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas
junrao commented on a change in pull request #10684: URL: https://github.com/apache/kafka/pull/10684#discussion_r638164890 ## File path: core/src/main/scala/kafka/log/LogSegments.scala ## @@ -210,11 +209,23 @@ class LogSegments(topicPartition: TopicPartition) { * @return the entry associated with the greatest offset, if it exists. */ @threadsafe - def lastEntry: Option[Map.Entry[JLong, LogSegment]] = Option(segments.lastEntry) + def lastEntry: Option[Map.Entry[Long, LogSegment]] = Option(segments.lastEntry) /** * @return the log segment with the greatest offset, if it exists. */ @threadsafe def lastSegment: Option[LogSegment] = lastEntry.map(_.getValue) + + /** + * @return an iterable with log segments ordered from lowest base offset to highest, + * each segment returned has a base offset strictly greater than the provided baseOffset. + */ + def higherSegments(baseOffset: Long): Iterable[LogSegment] = { +val view = + Option(segments.higherKey(baseOffset)).map { +higherOffset => segments.tailMap(higherOffset, true) + }.getOrElse(new ConcurrentSkipListMap[Long, LogSegment]()) Review comment: Could we return a constant empty map? ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1217,10 +1213,8 @@ class Log(@volatile private var _dir: File, fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage) if (fetchDataInfo != null) { if (includeAbortedTxns) - fetchDataInfo = addAbortedTransactions(startOffset, segmentEntry, fetchDataInfo) - } else segmentEntryOpt = segments.higherEntry(baseOffset) - - done = fetchDataInfo != null || segmentEntryOpt.isEmpty + fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo) + } else segmentOpt = segmentsIterator.nextOption() Review comment: The old logic supports skipping forward multiple segments to find the right data. The new logic seems to only support skipping forward once. It would be useful to preserve the original semantic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request #10754: KAFKA-12835: Topic IDs can mismatch on brokers (after interbroker protocol version update)
jolshan opened a new pull request #10754: URL: https://github.com/apache/kafka/pull/10754 Upon upgrading to IBP 2.8, topic ID can end up getting reassigned which can cause errors in LeaderAndIsr handling when the partition metadata files from the previous ID are still on the broker. Topic IDs are stored in the TopicZNode. The behavior of the code before this fix is as follows: When we have a controller with an older IBP version and we reassign partitions, the TopicZNode is overwritten and we lose the topic ID. Upon electing a 2.8 IBP controller, we will see the TopicZNode is missing a topic ID and will generate a new one. If the broker still has the old partition metadata file, we will see an ID mismatch that causes the error. This PR changes controller logic so that we maintain the topic ID in the controller and the ZNode even when IBP < 2.8. This means that in the scenario above, reassigning partitions will not result in losing the topic ID and reassignment. Topic IDs may be lost when downgrading the code below version 2.8, but upon re-upgrading to code version 2.8, before bumping the IBP, all partition metadata files will be deleted to prevent any errors. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12838) Kafka Broker - Request threads inefficiently blocking during produce
[ https://issues.apache.org/jira/browse/KAFKA-12838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350606#comment-17350606 ] Ryanne Dolan commented on KAFKA-12838: -- Would it help to significantly increase the number of partitions you're writing to? > Kafka Broker - Request threads inefficiently blocking during produce > > > Key: KAFKA-12838 > URL: https://issues.apache.org/jira/browse/KAFKA-12838 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.7.0, 2.8.0 >Reporter: Ryan Cabral >Priority: Major > > Hello, I have been using Kafka brokers for a bit and have run into a problem > with the way a kafka broker handles produce requests. If there are multiple > producers to the same topic and partition, any request handler threads > handling the produce for that topic and partition become blocked until all > requests before it are done. Request handler threads for the entire broker > can become exhausted waiting on the same partition lock, blocking requests > for other partitions that would not have needed the same lock. > Once that starts happening, requests start to back up, queued requests can > reach its maximum and network threads begin to be paused cascading the > problem a bit more. Overall performance ends up being degraded. I'm not so > focused on the cascade at the moment as I am the initial contention. > Intuitively I would expect locking contention on a single partition to ONLY > affect throughput on that partition and not the entire broker. > > The append call within the request handler originates here: > [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/KafkaApis.scala#L638] > Further down the stack the lock during append is created here: > [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/log/Log.scala#L1165] > At this point the first request will hold the lock during append and future > requests on the same partition will block, waiting for the lock, tying up an > io thread (request handler). > At first glance, it seems like it would make the most sense to (via config?) > be able to funnel (produce) requests for the same partition through its own > request queue of sorts and dispatch them such that at most one io thread is > tied up at a time for a given partition. There are a number of reasons the > lock could be held elsewhere too but this should at least help mitigate the > issue a bit. I'm assuming this is easier said than done though and likely > requires significant refactoring to properly achieve but hoping this is > something that could end up on some sort of long term roadmap. > > Snippet from jstack. Almost all request handlers threads (there are 256 of > them, up from 25 to mitigate the issue) in the jstack are blocked waiting on > the same lock due to the number of producers we have. > > {noformat} > "data-plane-kafka-request-handler-254" #335 daemon prio=5 os_prio=0 > tid=0x7fb1c9f13000 nid=0x53f1 runnable [0x7fad35796000] > java.lang.Thread.State: RUNNABLE > at > org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.(KafkaLZ4BlockOutputStream.java:82) > at > org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.(KafkaLZ4BlockOutputStream.java:125) > at > org.apache.kafka.common.record.CompressionType$4.wrapForOutput(CompressionType.java:101) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.(MemoryRecordsBuilder.java:134) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.(MemoryRecordsBuilder.java:170) > at > org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:508) > at > kafka.log.LogValidator$.buildRecordsAndAssignOffsets(LogValidator.scala:500) > at > kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:455) > at > kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:106) > at kafka.log.Log.$anonfun$append$2(Log.scala:1126) > - locked <0x0004c9a6fd60> (a java.lang.Object) > at kafka.log.Log.append(Log.scala:2387) > at kafka.log.Log.appendAsLeader(Log.scala:1050) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953) > at kafka.server.ReplicaManager$$Lambda$1078/1017241486.apply(Unknown > Source) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMa
[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution
fqaiser94 commented on a change in pull request #10747: URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java ## @@ -45,34 +45,30 @@ public void setIfUnset(final Serializer defaultKeySerializer, final Serial } /** - * @throws StreamsException if both old and new values of data are null, or if - * both values are not null + * @throws StreamsException if both old and new values of data are null. */ @Override public byte[] serialize(final String topic, final Headers headers, final Change data) { -final byte[] serializedKey; +final boolean oldValueIsNull = data.oldValue == null; +final boolean newValueIsNull = data.newValue == null; -// only one of the old / new values would be not null -if (data.newValue != null) { -if (data.oldValue != null) { -throw new StreamsException("Both old and new values are not null (" + data.oldValue -+ " : " + data.newValue + ") in ChangeSerializer, which is not allowed."); -} - -serializedKey = inner.serialize(topic, headers, data.newValue); +// both old and new values cannot be null +if (oldValueIsNull && newValueIsNull) { +throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); } else { -if (data.oldValue == null) { -throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); -} - -serializedKey = inner.serialize(topic, headers, data.oldValue); +final byte[] newData = newValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.newValue); +final byte[] oldData = oldValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.oldValue); + +final int newDataLength = newData.length; +final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + oldData.length; + +return ByteBuffer +.allocate(capacity) +.putInt(newDataLength) +.put(newData) +.put(oldData) +.array(); Review comment: With the change to the `ChangedSerializer` and `ChangedDeserializer` classes, I don’t think users will be able to just upgrade from a previous version of Kafka Streams easily. Any "inflight" messages written by older library versions will fail to deserialize correctly after the upgrade. Not sure how these types of “breaking” changes are typically handled. 1. Is it simply a matter of noting this in the relevant upgrade doc i.e. users need to do an application-reset? 2. Or do we want to write more code to handle upgrade scenarios? 3. Or find a more backwards compatible way of writing this serde? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution
fqaiser94 commented on a change in pull request #10747: URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java ## @@ -45,34 +45,30 @@ public void setIfUnset(final Serializer defaultKeySerializer, final Serial } /** - * @throws StreamsException if both old and new values of data are null, or if - * both values are not null + * @throws StreamsException if both old and new values of data are null. */ @Override public byte[] serialize(final String topic, final Headers headers, final Change data) { -final byte[] serializedKey; +final boolean oldValueIsNull = data.oldValue == null; +final boolean newValueIsNull = data.newValue == null; -// only one of the old / new values would be not null -if (data.newValue != null) { -if (data.oldValue != null) { -throw new StreamsException("Both old and new values are not null (" + data.oldValue -+ " : " + data.newValue + ") in ChangeSerializer, which is not allowed."); -} - -serializedKey = inner.serialize(topic, headers, data.newValue); +// both old and new values cannot be null +if (oldValueIsNull && newValueIsNull) { +throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); } else { -if (data.oldValue == null) { -throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); -} - -serializedKey = inner.serialize(topic, headers, data.oldValue); +final byte[] newData = newValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.newValue); +final byte[] oldData = oldValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.oldValue); + +final int newDataLength = newData.length; +final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + oldData.length; + +return ByteBuffer +.allocate(capacity) +.putInt(newDataLength) +.put(newData) +.put(oldData) +.array(); Review comment: With the change to the `ChangedSerializer` and `ChangedDeserializer` classes, I don’t think users will be able to just upgrade from a previous version of Kafka Streams easily. Any "inflight" messages written by older library versions will fail to deserialize correctly after the upgrade. Not sure how these types of “breaking” changes are typically handled. 1. Is it simply a matter of noting this in the relevant upgrade doc? 2. Or do we want to write more code to handle upgrade scenarios? 3. Or find a more backwards compatible way of writing this serde? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
cmccabe opened a new pull request #10753: URL: https://github.com/apache/kafka/pull/10753 Support the KIP-455 reassignment API when in KRaft mode. Reassignments which merely rearrange partitions complete immediately. Those that only remove a partition complete immediately if the ISR would be non-empty after the specified removals. Reassignments that add one or more partitions follow the KIP-455 pattern of adding all the adding replicas to the replica set, and then waiting for the ISR to include all the new partitions before completing. Changes to the partition sets are accomplished via PartitionChangeRecord. Add support for the ReassigningPartitions metric, which tracks the number of partitions which are currently reassigning. This metric is only exposed when running in standalone mode, to avoid conflicting with the broker metric. In TimelineInteger and TimelineLong, replace increment with incrementAndGet, and add an addAndGet function. Similar for decrement / subtraction. This makes updating metrics more convenient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request #10752: KAFKA-12620 Allocate Producer IDs in KRaft controller
mumrah opened a new pull request #10752: URL: https://github.com/apache/kafka/pull/10752 This is part 2 of [KIP-730](https://cwiki.apache.org/confluence/display/KAFKA/KIP-730%3A+Producer+ID+generation+in+KRaft+mode), part 1 was in #10504. This PR adds support on the KRaft controller for handling AllocateProducerIDs requests and managing the state of the latest producer ID block in the controller and committing this state to the metadata log. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao merged pull request #10742: MINOR: Add log identifier/prefix printing in Log layer static functions
junrao merged pull request #10742: URL: https://github.com/apache/kafka/pull/10742 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8522) Tombstones can survive forever
[ https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350551#comment-17350551 ] Jun Rao commented on KAFKA-8522: latest PR link > Tombstones can survive forever > -- > > Key: KAFKA-8522 > URL: https://issues.apache.org/jira/browse/KAFKA-8522 > Project: Kafka > Issue Type: Improvement > Components: log cleaner >Reporter: Evelyn Bayes >Priority: Minor > > This is a bit grey zone as to whether it's a "bug" but it is certainly > unintended behaviour. > > Under specific conditions tombstones effectively survive forever: > * Small amount of throughput; > * min.cleanable.dirty.ratio near or at 0; and > * Other parameters at default. > What happens is all the data continuously gets cycled into the oldest > segment. Old records get compacted away, but the new records continuously > update the timestamp of the oldest segment reseting the countdown for > deleting tombstones. > So tombstones build up in the oldest segment forever. > > While you could "fix" this by reducing the segment size, this can be > undesirable as a sudden change in throughput could cause a dangerous number > of segments to be created. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dejan2609 commented on pull request #10698: KAFKA-12770: introduce `checkstyleVersion` build option (for overriding CheckStyle project-defined dependency version)
dejan2609 commented on pull request #10698: URL: https://github.com/apache/kafka/pull/10698#issuecomment-847201180 CheckStyle team (@romani) needs this in order to add Kafka project into their regression suit here: https://github.com/checkstyle/contribution/blob/master/checkstyle-tester/projects-to-test-on.properties @showuon was so kind to provide his assistance, but we now need approval by someone who has a write access (@mumrah, @guozhangwang, @hachikuji, @ijuma, @junrao, @cmccabe or someone else). Note: changes are small and simple (and risk doesn't exist). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 edited a comment on pull request #10698: KAFKA-12770: introduce `checkstyleVersion` build option (for overriding CheckStyle project-defined dependency version)
dejan2609 edited a comment on pull request #10698: URL: https://github.com/apache/kafka/pull/10698#issuecomment-847174293 If I may ask you @showuon: do we need to ping someone else for another review (or for merge into trunk) ? _Edit (just to answer to my self): there are two types of approvals and hence this PR does need an additional review(s)... in any case: thanx @showuon !_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on pull request #10698: KAFKA-12770: introduce `checkstyleVersion` build option (for overriding CheckStyle project-defined dependency version)
dejan2609 commented on pull request #10698: URL: https://github.com/apache/kafka/pull/10698#issuecomment-847174293 If I may ask you @showuon: do we need to ping someone else for another review (or for merge into trunk) ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #10751: MINOR: update java doc for ConsumerCoordinator
dengziming commented on pull request #10751: URL: https://github.com/apache/kafka/pull/10751#issuecomment-847135527 iif means if and only if -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #10744: KAFKA-8410: KTableProcessor migration groundwork
vvcephei commented on pull request #10744: URL: https://github.com/apache/kafka/pull/10744#issuecomment-847118811 Filed ticket for Connect test: https://issues.apache.org/jira/browse/KAFKA-12842 Commented on ticket for Raft test: https://issues.apache.org/jira/browse/KAFKA-12629 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12629) Failing Test: RaftClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-12629: - Summary: Failing Test: RaftClusterTest (was: Flaky Test RaftClusterTest) > Failing Test: RaftClusterTest > - > > Key: KAFKA-12629 > URL: https://issues.apache.org/jira/browse/KAFKA-12629 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Blocker > Labels: flaky-test > Fix For: 3.0.0 > > > {quote} {{java.util.concurrent.ExecutionException: > java.lang.ClassNotFoundException: > org.apache.kafka.controller.NoOpSnapshotWriterBuilder > at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364) > at > kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12629) Flaky Test RaftClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350483#comment-17350483 ] John Roesler edited comment on KAFKA-12629 at 5/24/21, 3:22 PM: Failed also on: [https://github.com/apache/kafka/pull/10744/checks?check_run_id=2643450959] When this build fails in a PR build, it means that it failed twice in a row. I'm inclined to merge [~showuon] 's PR. Perpetually failing tests provide negative value to the codebase. If we can't bother to fix the test, then we obviously don't care about the logic under test. Rather than do anything rash, I've upgraded this ticket and the proposed cause (KAFKA-12677) to blockers for 3.0. If a lot more time passes with no action here, I think we should go ahead and ignore the tests. The blocker tickets should be sufficient to get it looked at for 3.0. {noformat} java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:197) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutInvocation.proceed(TimeoutInvocation.java:46) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) at org.junit.platform.engine.support.hierarchical
[jira] [Updated] (KAFKA-12677) The raftCluster always send to the wrong active controller and never update
[ https://issues.apache.org/jira/browse/KAFKA-12677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-12677: - Fix Version/s: 3.0.0 > The raftCluster always send to the wrong active controller and never update > --- > > Key: KAFKA-12677 > URL: https://issues.apache.org/jira/browse/KAFKA-12677 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: Luke Chen >Assignee: Colin McCabe >Priority: Blocker > Fix For: 3.0.0 > > > We introduce KIP-500 to introduce a Self-Managed Metadata Quorum. We should > always have 1 active controller, and all the RPC will send to the active > controller. But there's chances that the active controller already changed, > but the RPC still send to the old one. > In the attachment log, we can see: > {code:java} > [Controller 3002] Becoming active at controller epoch 1. > ... > [Controller 3000] Becoming active at controller epoch 2. > {code} > So, the latest active controller should be 3000. But the create topic RPC are > all sending to controller 3002: > {code:java} > "errorMessage":"The active controller appears to be node 3000" > {code} > This bug causes the RaftClusterTests flaky. > > Debug log while running testCreateClusterAndCreateListDeleteTopic test: > https://drive.google.com/file/d/1WVUgy1Erjx8mHyofiP9MVvQGb0LcDYt3/view?usp=sharing -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12677) The raftCluster always send to the wrong active controller and never update
[ https://issues.apache.org/jira/browse/KAFKA-12677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-12677: - Priority: Blocker (was: Major) > The raftCluster always send to the wrong active controller and never update > --- > > Key: KAFKA-12677 > URL: https://issues.apache.org/jira/browse/KAFKA-12677 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: Luke Chen >Assignee: Colin McCabe >Priority: Blocker > > We introduce KIP-500 to introduce a Self-Managed Metadata Quorum. We should > always have 1 active controller, and all the RPC will send to the active > controller. But there's chances that the active controller already changed, > but the RPC still send to the old one. > In the attachment log, we can see: > {code:java} > [Controller 3002] Becoming active at controller epoch 1. > ... > [Controller 3000] Becoming active at controller epoch 2. > {code} > So, the latest active controller should be 3000. But the create topic RPC are > all sending to controller 3002: > {code:java} > "errorMessage":"The active controller appears to be node 3000" > {code} > This bug causes the RaftClusterTests flaky. > > Debug log while running testCreateClusterAndCreateListDeleteTopic test: > https://drive.google.com/file/d/1WVUgy1Erjx8mHyofiP9MVvQGb0LcDYt3/view?usp=sharing -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12629) Flaky Test RaftClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-12629: - Priority: Blocker (was: Critical) > Flaky Test RaftClusterTest > -- > > Key: KAFKA-12629 > URL: https://issues.apache.org/jira/browse/KAFKA-12629 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Blocker > Labels: flaky-test > > {quote} {{java.util.concurrent.ExecutionException: > java.lang.ClassNotFoundException: > org.apache.kafka.controller.NoOpSnapshotWriterBuilder > at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364) > at > kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12629) Flaky Test RaftClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-12629: - Fix Version/s: 3.0.0 > Flaky Test RaftClusterTest > -- > > Key: KAFKA-12629 > URL: https://issues.apache.org/jira/browse/KAFKA-12629 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Blocker > Labels: flaky-test > Fix For: 3.0.0 > > > {quote} {{java.util.concurrent.ExecutionException: > java.lang.ClassNotFoundException: > org.apache.kafka.controller.NoOpSnapshotWriterBuilder > at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364) > at > kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12629) Flaky Test RaftClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350483#comment-17350483 ] John Roesler commented on KAFKA-12629: -- Failed also on: [https://github.com/apache/kafka/pull/10744/checks?check_run_id=2643450959] When this build fails in a PR build, it means that it failed twice in a row. I'm inclined to merge [~showuon] 's PR. Perpetually failing tests provide negative value to the codebase. If we can't bother to fix the test, then we obviously don't care about the logic under test. {noformat} java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:197) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutInvocation.proceed(TimeoutInvocation.java:46) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) at java.base/java.util.ArrayList.forEach(ArrayList.java
[jira] [Created] (KAFKA-12842) Failing test: org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic
John Roesler created KAFKA-12842: Summary: Failing test: org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic Key: KAFKA-12842 URL: https://issues.apache.org/jira/browse/KAFKA-12842 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: John Roesler Fix For: 3.0.0 This test failed during a PR build, which means that it failed twice in a row, due to the test-retry logic in PR builds. [https://github.com/apache/kafka/pull/10744/checks?check_run_id=2643417209] {noformat} java.lang.NullPointerException at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) at org.reflections.Store.getAllIncluding(Store.java:82) at org.reflections.Store.getAll(Store.java:93) at org.reflections.Reflections.getSubTypesOf(Reflections.java:404) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:352) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:337) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209) at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:93) at org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50) at org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:174) at org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.startConnect(EmbeddedConnectCluster.java:260) at org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.start(EmbeddedConnectCluster.java:141) at org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic(ConnectWorkerIntegrationTest.java:303) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at sun.reflect.Gene
[GitHub] [kafka] sasukerui opened a new pull request #10751: MINOR: update java doc for ConsumerCoordinator
sasukerui opened a new pull request #10751: URL: https://github.com/apache/kafka/pull/10751 fix typo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on pull request #10552: URL: https://github.com/apache/kafka/pull/10552#issuecomment-847042383 @vahidhashemian , thanks for your comments. I've updated. Please take a look again. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r637943535 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -80,9 +80,7 @@ public MemberData(List partitions, Optional generation) log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the " + "general case assignment algorithm"); Review comment: Updated. Thanks. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -469,73 +426,190 @@ private boolean allSubscriptionsEqual(Set allTopics, TreeSet sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment)); sortedCurrentSubscriptions.addAll(currentAssignment.keySet()); -balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, -consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer, revocationRequired); +balance(currentAssignment, prevAssignment, sortedAllPartitions, unassignedPartitions, sortedCurrentSubscriptions, +consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, revocationRequired, +partitionsPerTopic, totalPartitionsCount); + +if (log.isDebugEnabled()) { +log.debug("final assignment: {}", currentAssignment); +} + return currentAssignment; } +/** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and sortedAssignedPartitions. If no assigned partitions, we'll just return all sorted topic partitions. + * This is used in generalAssign method + * + * We loop the sortedPartition, and compare the ith element in sortedAssignedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedAssignedPartitions + * + * @param sortedAllPartitions: sorted all partitions + * @param sortedAssignedPartitions: sorted partitions, all are included in the sortedPartitions + * @param topic2AllPotentialConsumers: topics mapped to all consumers that subscribed to it + * @return the partitions don't assign to any current consumers Review comment: Updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r637941574 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java ## @@ -598,6 +555,43 @@ public void testLargeAssignmentAndGroupWithUniformSubscription() { assignor.assign(partitionsPerTopic, subscriptions); } +@Timeout(40) +@Test +public void testLargeAssignmentAndGroupWithNonEqualSubscription() { +// 1 million partitions! +int topicCount = 500; +int partitionCount = 2_000; +int consumerCount = 2_000; + +List topics = new ArrayList<>(); +Map partitionsPerTopic = new HashMap<>(); +for (int i = 0; i < topicCount; i++) { +String topicName = getTopicName(i, topicCount); +topics.add(topicName); +partitionsPerTopic.put(topicName, partitionCount); +} +for (int i = 0; i < consumerCount; i++) { +if (i == consumerCount - 1) { +subscriptions.put(getConsumerName(i, consumerCount), new Subscription(topics.subList(0, 1))); +} else { +subscriptions.put(getConsumerName(i, consumerCount), new Subscription(topics)); +} +} + +Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + Review comment: It can, but this test: `testLargeAssignmentAndGroupWithNonEqualSubscription` is mainly to test **performance**, not functionality. We should cover the function testing in other tests. It's the same purpose for the above one: `testLargeAssignmentAndGroupWithUniformSubscription`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r637939950 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -384,37 +326,39 @@ private boolean allSubscriptionsEqual(Set allTopics, * * @param partitionsPerTopic The number of partitions for each subscribed topic. * @param subscriptions Map from the member id to their respective topic subscription + * @param currentAssignment Each consumer's previously owned and still-subscribed partitions * * @return Map from each member to the list of partitions assigned to them. */ private Map> generalAssign(Map partitionsPerTopic, -Map subscriptions) { -Map> currentAssignment = new HashMap<>(); +Map subscriptions, +Map> currentAssignment) { +if (log.isDebugEnabled()) { +log.debug("performing general assign. partitionsPerTopic: {}, subscriptions: {}, currentAssignment: {}", +partitionsPerTopic, subscriptions, currentAssignment); +} + Map prevAssignment = new HashMap<>(); partitionMovements = new PartitionMovements(); -prepopulateCurrentAssignments(subscriptions, currentAssignment, prevAssignment); +prepopulateCurrentAssignments(subscriptions, prevAssignment); -// a mapping of all topic partitions to all consumers that can be assigned to them -final Map> partition2AllPotentialConsumers = new HashMap<>(); -// a mapping of all consumers to all potential topic partitions that can be assigned to them -final Map> consumer2AllPotentialPartitions = new HashMap<>(); +// a mapping of all topics to all consumers that can be assigned to them +final Map> topic2AllPotentialConsumers = new HashMap<>(partitionsPerTopic.keySet().size()); +// a mapping of all consumers to all potential topics that can be assigned to them +final Map> consumer2AllPotentialTopics = new HashMap<>(subscriptions.keySet().size()); -// initialize partition2AllPotentialConsumers and consumer2AllPotentialPartitions in the following two for loops -for (Entry entry: partitionsPerTopic.entrySet()) { -for (int i = 0; i < entry.getValue(); ++i) -partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<>()); -} +// initialize topic2AllPotentialConsumers and consumer2AllPotentialTopics +partitionsPerTopic.keySet().stream().forEach( +topicName -> topic2AllPotentialConsumers.put(topicName, new ArrayList<>())); for (Entry entry: subscriptions.entrySet()) { String consumerId = entry.getKey(); -consumer2AllPotentialPartitions.put(consumerId, new ArrayList<>()); +List subscribedTopics = new ArrayList<>(entry.getValue().topics().size()); +consumer2AllPotentialTopics.put(consumerId, subscribedTopics); entry.getValue().topics().stream().filter(topic -> partitionsPerTopic.get(topic) != null).forEach(topic -> { -for (int i = 0; i < partitionsPerTopic.get(topic); ++i) { -TopicPartition topicPartition = new TopicPartition(topic, i); - consumer2AllPotentialPartitions.get(consumerId).add(topicPartition); - partition2AllPotentialConsumers.get(topicPartition).add(consumerId); -} +subscribedTopics.add(topic); Review comment: No, it just create a List with the capacity: `topics().size()`. We cannot just create a List with all topics directly, because we need to filter out topics not in `partitionsPerTopic`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r637936879 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -444,23 +392,32 @@ private boolean allSubscriptionsEqual(Set allTopics, // otherwise (the consumer still exists) for (Iterator partitionIter = entry.getValue().iterator(); partitionIter.hasNext();) { TopicPartition partition = partitionIter.next(); -if (!partition2AllPotentialConsumers.containsKey(partition)) { -// if this topic partition of this consumer no longer exists remove it from currentAssignment of the consumer +if (!topic2AllPotentialConsumers.containsKey(partition.topic())) { +// if this topic partition of this consumer no longer exists, remove it from currentAssignment of the consumer partitionIter.remove(); currentPartitionConsumer.remove(partition); -} else if (!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) { -// if this partition cannot remain assigned to its current consumer because the consumer -// is no longer subscribed to its topic remove it from currentAssignment of the consumer +} else if (!consumerSubscription.topics().contains(partition.topic())) { +// because the consumer is no longer subscribed to its topic, remove it from currentAssignment of the consumer partitionIter.remove(); revocationRequired = true; -} else +} else { // otherwise, remove the topic partition from those that need to be assigned only if // its current consumer is still subscribed to its topic (because it is already assigned // and we would want to preserve that assignment as much as possible) -unassignedPartitions.remove(partition); +assignedPartitions.add(partition); +} } } } + +// all partitions that needed to be assigned +List unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, assignedPartitions, topic2AllPotentialConsumers); +assignedPartitions = null; Review comment: Yes, it just tells the GC that this memory can be freed, to avoid OOM. I know in this step, we should already allocated all memories we need, but it's just in case. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12333) KafkaMetadataLog and MockLock should validate that appended epochs are monotonically
[ https://issues.apache.org/jira/browse/KAFKA-12333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350413#comment-17350413 ] loboxu commented on KAFKA-12333: [~jsancio] The problem seems to have been fixed, right? > KafkaMetadataLog and MockLock should validate that appended epochs are > monotonically > > > Key: KAFKA-12333 > URL: https://issues.apache.org/jira/browse/KAFKA-12333 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Assignee: loboxu >Priority: Major > > Both the MockLog and KafkaMetadataLog should only allow appendAsLeader and > appendAsFollower with monotonically increasing epochs. In other words the > following test in KafkaMetadataLogTest should fail: > {code:java} > @Test > def testOutOfOrderEpoch(): Unit = { > val topicPartition = new TopicPartition("cluster-metadata", 0) > val log = buildMetadataLog(tempDir, mockTime, topicPartition)val > recordFoo = new SimpleRecord("foo".getBytes()) > val currentEpoch = 3 > val initialOffset = log.endOffset().offsetlog.appendAsLeader( > MemoryRecords.withRecords(initialOffset, CompressionType.NONE, > currentEpoch, recordFoo), > currentEpoch > )// Out order epoch should throw an exception > log.appendAsLeader( > MemoryRecords.withRecords( > initialOffset + 1, CompressionType.NONE, currentEpoch - 1, recordFoo > ), > currentEpoch - 1 > )log.appendAsFollower( > MemoryRecords.withRecords( > initialOffset + 2, CompressionType.NONE, currentEpoch - 2, recordFoo > ) > ) > } {code} > The same for MockLogTest. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution
fqaiser94 commented on a change in pull request #10747: URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java ## @@ -45,34 +45,30 @@ public void setIfUnset(final Serializer defaultKeySerializer, final Serial } /** - * @throws StreamsException if both old and new values of data are null, or if - * both values are not null + * @throws StreamsException if both old and new values of data are null. */ @Override public byte[] serialize(final String topic, final Headers headers, final Change data) { -final byte[] serializedKey; +final boolean oldValueIsNull = data.oldValue == null; +final boolean newValueIsNull = data.newValue == null; -// only one of the old / new values would be not null -if (data.newValue != null) { -if (data.oldValue != null) { -throw new StreamsException("Both old and new values are not null (" + data.oldValue -+ " : " + data.newValue + ") in ChangeSerializer, which is not allowed."); -} - -serializedKey = inner.serialize(topic, headers, data.newValue); +// both old and new values cannot be null +if (oldValueIsNull && newValueIsNull) { +throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); } else { -if (data.oldValue == null) { -throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); -} - -serializedKey = inner.serialize(topic, headers, data.oldValue); +final byte[] newData = newValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.newValue); +final byte[] oldData = oldValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.oldValue); + +final int newDataLength = newData.length; +final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + oldData.length; + +return ByteBuffer +.allocate(capacity) +.putInt(newDataLength) +.put(newData) +.put(oldData) +.array(); Review comment: With the change to the `ChangedSerializer` and `ChangedDeserializer` classes, I don’t think users will be able to just upgrade from a previous version of Kafka Streams easily. Any "inflight" messages from older library versions will fail to deserialize correctly. Not sure how these types of “breaking” changes are typically handled. 1. Is it simply a matter of noting this in the relevant upgrade doc? 2. Or do we want to write more code to handle upgrade scenarios? 3. Or find a more backwards compatible way of writing this serde? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution
fqaiser94 commented on a change in pull request #10747: URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java ## @@ -45,34 +45,30 @@ public void setIfUnset(final Serializer defaultKeySerializer, final Serial } /** - * @throws StreamsException if both old and new values of data are null, or if - * both values are not null + * @throws StreamsException if both old and new values of data are null. */ @Override public byte[] serialize(final String topic, final Headers headers, final Change data) { -final byte[] serializedKey; +final boolean oldValueIsNull = data.oldValue == null; +final boolean newValueIsNull = data.newValue == null; -// only one of the old / new values would be not null -if (data.newValue != null) { -if (data.oldValue != null) { -throw new StreamsException("Both old and new values are not null (" + data.oldValue -+ " : " + data.newValue + ") in ChangeSerializer, which is not allowed."); -} - -serializedKey = inner.serialize(topic, headers, data.newValue); +// both old and new values cannot be null +if (oldValueIsNull && newValueIsNull) { +throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); } else { -if (data.oldValue == null) { -throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); -} - -serializedKey = inner.serialize(topic, headers, data.oldValue); +final byte[] newData = newValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.newValue); +final byte[] oldData = oldValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.oldValue); + +final int newDataLength = newData.length; +final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + oldData.length; + +return ByteBuffer +.allocate(capacity) +.putInt(newDataLength) +.put(newData) +.put(oldData) +.array(); Review comment: With the change to the `ChangedSerializer` and `ChangedDeserializer` classes, I don’t think users will be able to just upgrade from a previous version of Kafka Streams easily. Any "inflight" messages from older library versions will fail to deserialize correctly. Not sure how these types of “breaking” changes are typically handled. Is it simply a matter of noting this in the relevant upgrade doc? Or do we want to write more code to handle upgrade scenarios? Or find a more backwards compatible way of writing this serde? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution
fqaiser94 commented on a change in pull request #10747: URL: https://github.com/apache/kafka/pull/10747#discussion_r637657657 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java ## @@ -90,14 +90,19 @@ public void process(final K key, final Change change) { // if the selected repartition key or value is null, skip // forward oldPair first, to be consistent with reduce and aggregate -if (oldPair != null && oldPair.key != null && oldPair.value != null) { -context().forward(oldPair.key, new Change<>(null, oldPair.value)); +final boolean oldPairNotNull = oldPair != null && oldPair.key != null && oldPair.value != null; +final boolean newPairNotNull = newPair != null && newPair.key != null && newPair.value != null; +if (oldPairNotNull && newPairNotNull && oldPair.key == newPair.key) { Review comment: As noted by Matthias on the mailing list thread, this fix depends on a correct implementation of `.equals()` method for the key type. Would we need to document this assumption somewhere for users? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ayoukhananov commented on pull request #10728: NPE from the provided metadata in client callback in case of ApiException.
ayoukhananov commented on pull request #10728: URL: https://github.com/apache/kafka/pull/10728#issuecomment-846970648 @showuon Thanks for your feedback. 1. NPE happened in our Prod env . I miss the part of Jira ticket and added now this [Ticket](https://issues.apache.org/jira/browse/KAFKA-12841) 2. Will Add Kafka 3. Will try to add tests soon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12841) NPE from the provided metadata in client callback in case of ApiException
Avi Youkhananov created KAFKA-12841: --- Summary: NPE from the provided metadata in client callback in case of ApiException Key: KAFKA-12841 URL: https://issues.apache.org/jira/browse/KAFKA-12841 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.6.0 Environment: Prod Reporter: Avi Youkhananov Attachments: NPE.production 1. org.apache.kafka.clients.producer.Callback interface has method onCompletion(...) Which says as part of the documentation : *The metadata for the record that was sent (i.e. the partition and offset). *An empty metadata with -1 value for all fields* except for topicPartition will be returned if an error occurred. We got an NPE from doSend(...) method in org.apache.kafka.clients.producer.KafkaProducer Which can occur in case ApiException was thrown ... In case of ApiException it uses the regular callback instead of InterceptorCallback which also may cover the NPE. 2. More over RecordMetadata has method partition() which return int but can also throw NPE because TopicPartition might be null. Stack trace attached. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution
fqaiser94 commented on a change in pull request #10747: URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java ## @@ -45,34 +45,30 @@ public void setIfUnset(final Serializer defaultKeySerializer, final Serial } /** - * @throws StreamsException if both old and new values of data are null, or if - * both values are not null + * @throws StreamsException if both old and new values of data are null. */ @Override public byte[] serialize(final String topic, final Headers headers, final Change data) { -final byte[] serializedKey; +final boolean oldValueIsNull = data.oldValue == null; +final boolean newValueIsNull = data.newValue == null; -// only one of the old / new values would be not null -if (data.newValue != null) { -if (data.oldValue != null) { -throw new StreamsException("Both old and new values are not null (" + data.oldValue -+ " : " + data.newValue + ") in ChangeSerializer, which is not allowed."); -} - -serializedKey = inner.serialize(topic, headers, data.newValue); +// both old and new values cannot be null +if (oldValueIsNull && newValueIsNull) { +throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); } else { -if (data.oldValue == null) { -throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); -} - -serializedKey = inner.serialize(topic, headers, data.oldValue); +final byte[] newData = newValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.newValue); +final byte[] oldData = oldValueIsNull ? new byte[0] : inner.serialize(topic, headers, data.oldValue); + +final int newDataLength = newData.length; +final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + oldData.length; + +return ByteBuffer +.allocate(capacity) +.putInt(newDataLength) +.put(newData) +.put(oldData) +.array(); Review comment: With the change to the `ChangedSerializer` and `ChangedDeserializer` classes, I don’t think users will be able to just upgrade from a previous version of Kafka Streams easily. Any messages that were "inflight" prior to the upgrade will fail to deserialize correctly. Not sure how these types of “breaking” changes are typically handled. Is it simply a matter of noting this in the relevant upgrade doc? Or do we want to write more code to handle upgrade scenarios? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] DuongPTIT opened a new pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka
DuongPTIT opened a new pull request #10750: URL: https://github.com/apache/kafka/pull/10750 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka on Kafka version 2.5 PTAL @huxihx @kkonstantine . Many thanks. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12840) Removing `compact` cleaning on a topic should abort on-going compactions
David Jacot created KAFKA-12840: --- Summary: Removing `compact` cleaning on a topic should abort on-going compactions Key: KAFKA-12840 URL: https://issues.apache.org/jira/browse/KAFKA-12840 Project: Kafka Issue Type: Improvement Reporter: David Jacot Assignee: David Jacot When `compact` is removed from the `cleanup.policy` of a topic, the compactions of that topic should be aborted. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12835) Topic IDs can mismatch on brokers (after interbroker protocol version update)
[ https://issues.apache.org/jira/browse/KAFKA-12835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350330#comment-17350330 ] Ivan Yurchenko commented on KAFKA-12835: bq. I was curious about your upgrade process. Is there a reason you moved from old brokers to new brokers rather than doing a rolling restart of the same brokers? We try to keep machines immutable, nothing specific in Kafka itself. bq. I believe I found the cause to this issue and will be working on a fix. Great to hear this! Could you please the idea at the higher level? > Topic IDs can mismatch on brokers (after interbroker protocol version update) > - > > Key: KAFKA-12835 > URL: https://issues.apache.org/jira/browse/KAFKA-12835 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: Ivan Yurchenko >Assignee: Justine Olshan >Priority: Major > > We had a Kafka cluster running 2.8 version with interbroker protocol set to > 2.7. It had a number of topics and everything was fine. > Then we decided to update the interbroker protocol to 2.8 by the following > procedure: > 1. Run new brokers with the interbroker protocol set to 2.8. > 2. Move the data from the old brokers to the new ones (normal partition > reassignment API). > 3. Decommission the old brokers. > At the stage 2 we had the problem: old brokers started failing on > {{LeaderAndIsrRequest}} handling with > {code:java} > ERROR [Broker id=<...>] Topic Id in memory: <...> does not match the topic Id > for partition <...> provided in the request: <...>. (state.change.logger) > {code} > for multiple topics. Topics were not recreated. > We checked {{partition.metadata}} files and IDs there were indeed different > from the values in ZooKeeper. It was fixed by deleting the metadata files > (and letting them be recreated). > > The logs, unfortunately, didn't show anything that might point to the cause > of the issue (or it happened longer ago than we store the logs). > We tried to reproduce this also, but no success. > If the community can point out what to check or beware of in future, it will > be great. We'll be happy to provide additional information if needed. Thank > you! > Sorry for the ticket that might be not very actionable. We hope to at least > rise awareness of this issue. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-8120) Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka
[ https://issues.apache.org/jira/browse/KAFKA-8120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pham Huy Hoang updated KAFKA-8120: -- Comment: was deleted (was: Hi [~wj1918], I tried to test as your above steps. However, I got an unfathomable consumer error. When I added "batch.size=1" in config/connect-file-source.properties, my console-consumer received only 1985 / 2000 records (with records gen from your mongodump script), as well as it received only 96/100 records (with your test.txt file). Sometimes, it received only 1 records for each restarting kafka-connect. Please tell me why. Many thanks. ) > Getting NegativeArraySizeException when using Kafka Connect to send data to > Kafka > - > > Key: KAFKA-8120 > URL: https://issues.apache.org/jira/browse/KAFKA-8120 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.1.1, 2.5.0 > Environment: Ubuntu 16.04 LTS >Reporter: Prashant Shahi >Assignee: Jun Wang >Priority: Major > Attachments: gen-mongodump.sh.txt, test.txt > > > > I have a large MongoDump JSON which I tried pushing to Kafka using Kafka > Connect. > I am getting the following Exception after around 16k messages been pushed. > After the exception, the program doesn't get killed or exit, but now no more > messages are pushed. > {code:java} > [2019-03-15 08:48:13,812] ERROR WorkerSourceTask{id=od-test18-0} Task threw > an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:177) > java.lang.NegativeArraySizeException at > org.apache.kafka.connect.file.FileStreamSourceTask.poll(FileStreamSourceTask.java:141) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at > java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) [2019-03-15 08:48:13,814] ERROR > WorkerSourceTask{id=od-test18-0} Task is being killed and will not recover > until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:178){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12461) Extend LogManager to cover the metadata topic
[ https://issues.apache.org/jira/browse/KAFKA-12461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loboxu reassigned KAFKA-12461: -- Assignee: loboxu > Extend LogManager to cover the metadata topic > - > > Key: KAFKA-12461 > URL: https://issues.apache.org/jira/browse/KAFKA-12461 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: loboxu >Priority: Major > > The `@metadata` topic is not managed by `LogManager` since it uses a new > snapshot-based retention policy. This means that it is not covered by the > recovery and high watermark checkpoints. It would be useful to fix this. We > can either extend `LogManager` so that it is aware of the snapshotting > semantics implemented by the `@metadata` topic, or we can create something > like a `RaftLogManager`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8120) Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka
[ https://issues.apache.org/jira/browse/KAFKA-8120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350289#comment-17350289 ] Pham Huy Hoang commented on KAFKA-8120: --- Hi [~wj1918], I tried to test as your above steps. However, I got an unfathomable consumer error. When I added "batch.size=1" in config/connect-file-source.properties, my console-consumer received only 1985 / 2000 records (with records gen from your mongodump script), as well as it received only 96/100 records (with your test.txt file). Sometimes, it received only 1 records for each restarting kafka-connect. Please tell me why. Many thanks. > Getting NegativeArraySizeException when using Kafka Connect to send data to > Kafka > - > > Key: KAFKA-8120 > URL: https://issues.apache.org/jira/browse/KAFKA-8120 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.1.1, 2.5.0 > Environment: Ubuntu 16.04 LTS >Reporter: Prashant Shahi >Assignee: Jun Wang >Priority: Major > Attachments: gen-mongodump.sh.txt, test.txt > > > > I have a large MongoDump JSON which I tried pushing to Kafka using Kafka > Connect. > I am getting the following Exception after around 16k messages been pushed. > After the exception, the program doesn't get killed or exit, but now no more > messages are pushed. > {code:java} > [2019-03-15 08:48:13,812] ERROR WorkerSourceTask{id=od-test18-0} Task threw > an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:177) > java.lang.NegativeArraySizeException at > org.apache.kafka.connect.file.FileStreamSourceTask.poll(FileStreamSourceTask.java:141) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at > java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) [2019-03-15 08:48:13,814] ERROR > WorkerSourceTask{id=od-test18-0} Task is being killed and will not recover > until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:178){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12805) Aborted send could have a different exception than DisconnectException
[ https://issues.apache.org/jira/browse/KAFKA-12805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350279#comment-17350279 ] Luke Chen commented on KAFKA-12805: --- [~nicolas.guyomar], I tried to set the {{[request.timeout.ms|http://request.timeout.ms/]}} to a small value, and reproduce your issue, but I only got the exception: {code:java} org.apache.kafka.common.errors.TimeoutException: The request timed out.{code} Could you tell me I could reproduce your error? Thank you. > Aborted send could have a different exception than DisconnectException > -- > > Key: KAFKA-12805 > URL: https://issues.apache.org/jira/browse/KAFKA-12805 > Project: Kafka > Issue Type: Wish > Components: network >Reporter: Nicolas Guyomar >Assignee: Luke Chen >Priority: Minor > > Right now we are treating timeout in the network client as a disconnection > exception, which "hides" legit timeout where increasing > {{[request.timeout.ms|http://request.timeout.ms/]}} could be considered OK > when there is no "real" network disconnection : > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=describeConfigs, deadlineMs=1616147081029) timed out at > 1616147081039 after 2 attempt(s) > Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled > describeConfigs request with correlation id 8 due to node 1 being disconnected > > the DisconnectException is thrown because of the disconnect flag being set to > true in > [https://github.com/apache/kafka/blob/3d0b4d910b681df7d873c8a0285eaca01d6c173a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L352] > While we _could_ have a different path from there > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L793] > that would propagate the fact that the connection timed out because of > {{[request.timeout.ms|http://request.timeout.ms/]}} expiration, and adjust > the later thrown exception in there > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L1195] > so that it's not a {{DisconnectException}} ? > > Thank you > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12773) Use UncheckedIOException when wrapping IOException
[ https://issues.apache.org/jira/browse/KAFKA-12773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350274#comment-17350274 ] loboxu commented on KAFKA-12773: [~jagsancio] Could you please tell me the name of your GitHub account? Please review the code for me. [https://github.com/apache/kafka/pull/10749] > Use UncheckedIOException when wrapping IOException > -- > > Key: KAFKA-12773 > URL: https://issues.apache.org/jira/browse/KAFKA-12773 > Project: Kafka > Issue Type: Sub-task >Reporter: Jose Armando Garcia Sancio >Assignee: loboxu >Priority: Major > Labels: kip-500, newbie > > The {{raft}} module may not be fully consistent on this but in general in > that module we have decided to not throw the checked {{IOException}}. We have > been avoiding checked {{IOException}} exceptions by wrapping them in > {{RuntimeException}}. The {{raft}} module should instead wrap {{IOException}} > in {{UncheckedIOException}}. This change should be limited to the {{raft}} > module. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] socutes opened a new pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException
socutes opened a new pull request #10749: URL: https://github.com/apache/kafka/pull/10749 The raft module may not be fully consistent on this but in general in that module we have decided to not throw the checked IOException. We have been avoiding checked IOException exceptions by wrapping them in RuntimeException. The raft module should instead wrap IOException in UncheckedIOException. This change should be limited to the raft module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org