[jira] [Resolved] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-9295. --- Resolution: Fixed > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > -- > > Key: KAFKA-9295 > URL: https://issues.apache.org/jira/browse/KAFKA-9295 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.4.0, 2.6.0 >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > Fix For: 3.0.0 > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/] > {quote}java.lang.AssertionError: Did not receive all 1 records from topic > output- within 6 ms Expected: is a value equal to or greater than <1> > but: <0> was less than <1> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10715: KAFKA-9295: increase heartbeat and session timeout
ableegoldman commented on pull request #10715: URL: https://github.com/apache/kafka/pull/10715#issuecomment-843771446 Merged to trunk at exactly 11pm PST (May 18th 2021) -- be on the lookout for any failures on builds kicked off after this time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10715: KAFKA-9295: increase heartbeat and session timeout
ableegoldman merged pull request #10715: URL: https://github.com/apache/kafka/pull/10715 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a change in pull request #10700: KAFKA-12789: Remove Stale comments for meta response handling logic
zhaohaidao commented on a change in pull request #10700: URL: https://github.com/apache/kafka/pull/10700#discussion_r634933714 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ## @@ -1089,8 +1089,8 @@ public void handleSuccessfulResponse(RequestHeader requestHeader, long now, Meta if (!errors.isEmpty()) log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors); -// Don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being -// created which means we will get errors and no nodes until it exists +// Don't update the cluster if there are no valid nodes which may happened during the startup phase of Review comment: make sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #10700: KAFKA-12789: Remove Stale comments for meta response handling logic
abbccdda commented on a change in pull request #10700: URL: https://github.com/apache/kafka/pull/10700#discussion_r634931818 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ## @@ -1089,8 +1089,8 @@ public void handleSuccessfulResponse(RequestHeader requestHeader, long now, Meta if (!errors.isEmpty()) log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors); -// Don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being -// created which means we will get errors and no nodes until it exists +// Don't update the cluster if there are no valid nodes which may happened during the startup phase of Review comment: may happened -> may happen How about rephrase to `When talking to the startup phase of a broker, it is possible to receive an empty metadata set, which we should retry later.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #10715: KAFKA-9295: increase heartbeat and session timeout
showuon edited a comment on pull request #10715: URL: https://github.com/apache/kafka/pull/10715#issuecomment-843766426 @ableegoldman , Failed test is unrelated and flaky. Thanks. ``` Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10715: KAFKA-9295: increase heartbeat and session timeout
showuon commented on pull request #10715: URL: https://github.com/apache/kafka/pull/10715#issuecomment-843766426 Failed test is unrelated and flaky. ``` Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda closed pull request #9311: KAFKA-9910: Implement new transaction timed out error
abbccdda closed pull request #9311: URL: https://github.com/apache/kafka/pull/9311 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10201) Update codebase to use more inclusive terms
[ https://issues.apache.org/jira/browse/KAFKA-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17347311#comment-17347311 ] Vitaly Gerasimov commented on KAFKA-10201: -- What's about to change the kafka "black" logo (sorry for "black" naming) to rainbow color or something else? > Update codebase to use more inclusive terms > --- > > Key: KAFKA-10201 > URL: https://issues.apache.org/jira/browse/KAFKA-10201 > Project: Kafka > Issue Type: Improvement >Reporter: Xavier Léauté >Priority: Major > Fix For: 3.0.0 > > > see the corresponding KIP > https://cwiki.apache.org/confluence/display/KAFKA/KIP-629:+Use+racially+neutral+terms+in+our+codebase -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #10326: Avoid newly replicating brokers in RackAwareReplicaSelector
dajac commented on pull request #10326: URL: https://github.com/apache/kafka/pull/10326#issuecomment-843754057 @lizthegrey Please, excuse me for the delay on this one. I did not have the time to really look into it. At the moment, it is not clear to me that the PR actually solves the problem that you hit. The part which bugs me a bit is the following one. If you look at the [code](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1260) which selects the read replica, it only selects the follower replicas which have the requested fetch offset. My understanding is that the consumer was requesting an offset that the newly added replica did not have yet. Therefore, when the consumer was restarted, the broker should NOT have selected the newly added replica. If it did, I wonder how that could have happened. Do you see any incidences in the logs that the consumer got a preferred read replica from the leader? I wonder if the unclean leader election played a role into this. Would you have logs (controller, state changes, etc.) for that partition during that time? We need to reconstruct the sequence of events to better understand the case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10722: MINOR: update java doc for deprecated methods
showuon commented on pull request #10722: URL: https://github.com/apache/kafka/pull/10722#issuecomment-843715792 @mjsax , could you please review this small PR to update java doc? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #10722: MINOR: update java doc for deprecated methods
showuon opened a new pull request #10722: URL: https://github.com/apache/kafka/pull/10722 Update deprecated methods: 1. `KStream#through(String, Produced)` 2. `KafkaConsumer#poll(long)` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] DuongPTIT closed pull request #10685: KAFKA-12380 Executor in Connect's Worker is not shut down when the worker is
DuongPTIT closed pull request #10685: URL: https://github.com/apache/kafka/pull/10685 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10714: MINOR: add ConfigUtils method for printing configurations
showuon commented on a change in pull request #10714: URL: https://github.com/apache/kafka/pull/10714#discussion_r634877755 ## File path: clients/src/test/java/org/apache/kafka/common/utils/ConfigUtilsTest.java ## @@ -140,4 +144,28 @@ public void testMultipleDeprecations() { assertNull(newConfig.get("foo.bar.deprecated")); assertNull(newConfig.get("foo.bar.even.more.deprecated")); } + +private static final ConfigDef CONFIG = new ConfigDef(). +define("foo", Type.PASSWORD, Importance.HIGH, ""). +define("bar", Type.STRING, Importance.HIGH, ""). +define("quux", Type.INT, Importance.HIGH, ""). +define("blah", Type.STRING, Importance.HIGH, ""); + +@Test +public void testConfigMapToRedactedStringForEmptyMap() { +assertEquals("{}", ConfigUtils. +configMapToRedactedString(Collections.emptyMap(), CONFIG)); +} + +@Test +public void testConfigMapToRedactedStringWithSecrets() { +Map testMap1 = new HashMap<>(); +testMap1.put("bar", "whatever"); +testMap1.put("quux", Integer.valueOf(123)); +testMap1.put("foo", "foosecret"); +testMap1.put("blah", null); +testMap1.put("quuux", Integer.valueOf(456)); +assertEquals("{bar=\"whatever\", blah=null, foo=(redacted), quuux=(redacted), quux=123}", Review comment: `nit`: could we rename the config name into a "meaningful" name? So that we can read the name and know which type it is, and what expected result it is. ex: ``` define("passwordConfig", Type.PASSWORD, Importance.HIGH, ""). define("stringConfig", Type.STRING, Importance.HIGH, "") ... testMap1.put("stringConfig", "whatever"); testMap1.put("passwordConfig", "foosecret"); testMap1.put("undefinedConfig", Integer.valueOf(456)); ``` So, in assert we can have this result: ``` assertEquals("{stringConfig=\"whatever\", ..., passwordConfig=(redacted), undefinedConfig=(redacted), ...}", ``` What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence
[ https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17347269#comment-17347269 ] NEERAJ VAIDYA commented on KAFKA-12776: --- Thanks [~ableegoldman] I'd imagined KIP-739 would be a solution to this issue, just that [~guozhang] reckons otherwise. I will read it closer to see if it solves the reordering issue. > Producer sends messages out-of-order inspite of enabling idempotence > > > Key: KAFKA-12776 > URL: https://issues.apache.org/jira/browse/KAFKA-12776 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0, 2.7.0 > Environment: Linux RHEL 7.9 and Ubuntu 20.04 >Reporter: NEERAJ VAIDYA >Priority: Major > Attachments: mocker.zip > > > I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). > My application is basically a Spring boot web-application which accepts JSON > payloads via HTTP and then pushes each to a Kafka topic. I also use Spring > Cloud Stream Kafka in the application to create and use a Producer. > For one of my failure handling test cases, I shutdown the Kafka cluster while > my applications are running. (Note : No messages have been published to the > Kafka cluster before I stop the cluster) > When the producer application tries to write messages to TA, it cannot > because the cluster is down and hence (I assume) buffers the messages. Let's > say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is > first and m4 is last). > When I bring the Kafka cluster back online, the producer sends the buffered > messages to the topic, but they are not in order. I receive for example, m2 > then m3 then m1 and then m4. > Why is that ? Is it because the buffering in the producer is multi-threaded > with each producing to the topic at the same time ? > My project code is attached herewith. > I can confirm that I have enabled idempotence. I have also tried with > ```max.in.flight.requests=1``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ViE-17-tuoi closed pull request #10721: Feature/kafka 12380
ViE-17-tuoi closed pull request #10721: URL: https://github.com/apache/kafka/pull/10721 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10715: KAFKA-9295: increase heartbeat and session timeout
showuon commented on pull request #10715: URL: https://github.com/apache/kafka/pull/10715#issuecomment-843693730 I'll monitor it and let you know when completed. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10716: optimize offset reset strategy and fix lose data when add partition
showuon commented on pull request #10716: URL: https://github.com/apache/kafka/pull/10716#issuecomment-843693354 Also, there are code conflict with `trunk` branch. Please merge `trunk` and push again. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ViE-17-tuoi opened a new pull request #10721: Feature/kafka 12380
ViE-17-tuoi opened a new pull request #10721: URL: https://github.com/apache/kafka/pull/10721 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence
[ https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17347259#comment-17347259 ] A. Sophie Blee-Goldman commented on KAFKA-12776: [~neeraj.vaidya] although the ongoing [KIP-739|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446] isn't actually trying to solve the same problem, I expect there might be considerable overlap in the implementation and design considerations between that KIP and what you're trying to accomplish. I recommend starting out by reading up on that KIP to see how the current design aligns with your thinking. Admittedly I haven't been following it closely, but my impression is that the proposal involves introducing another queue that sits outside the regular partitioned buffers. It sounds like this approach could be a good fit for solving the re-ordering problem as well. If you agree, you can follow up by sending a message to the KIP-739 discussion thread and outlining how their design could apply to your case, and whether that's something that could be worked into this KIP and its general design. I suspect it would not require much additional work or expand the current scope too much, but if it does maybe you can offer to help out with this aspect of the KIP. > Producer sends messages out-of-order inspite of enabling idempotence > > > Key: KAFKA-12776 > URL: https://issues.apache.org/jira/browse/KAFKA-12776 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0, 2.7.0 > Environment: Linux RHEL 7.9 and Ubuntu 20.04 >Reporter: NEERAJ VAIDYA >Priority: Major > Attachments: mocker.zip > > > I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). > My application is basically a Spring boot web-application which accepts JSON > payloads via HTTP and then pushes each to a Kafka topic. I also use Spring > Cloud Stream Kafka in the application to create and use a Producer. > For one of my failure handling test cases, I shutdown the Kafka cluster while > my applications are running. (Note : No messages have been published to the > Kafka cluster before I stop the cluster) > When the producer application tries to write messages to TA, it cannot > because the cluster is down and hence (I assume) buffers the messages. Let's > say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is > first and m4 is last). > When I bring the Kafka cluster back online, the producer sends the buffered > messages to the topic, but they are not in order. I receive for example, m2 > then m3 then m1 and then m4. > Why is that ? Is it because the buffering in the producer is multi-threaded > with each producing to the topic at the same time ? > My project code is attached herewith. > I can confirm that I have enabled idempotence. I have also tried with > ```max.in.flight.requests=1``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12811) kafka-topics.sh should let the user know they cannot adjust the replication factor for a topic using the --alter flag and not warn about missing the --partition flag
[ https://issues.apache.org/jira/browse/KAFKA-12811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17347257#comment-17347257 ] Luke Chen commented on KAFKA-12811: --- [~rjoerger], I've helped you move this ticket into "patch available" > kafka-topics.sh should let the user know they cannot adjust the replication > factor for a topic using the --alter flag and not warn about missing the > --partition flag > - > > Key: KAFKA-12811 > URL: https://issues.apache.org/jira/browse/KAFKA-12811 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Richard Joerger >Priority: Trivial > > Currently the kafka-topics.sh tool will return the following message when > trying to alter the replication factor for a topic: > > {code:java} > $ ./bin/kafka-topics.sh --bootstrap-server > {code} > _host_ > {code:java} > :9092 --alter --topic testTopic --replication-factor 4 > 21/05/18 13:23:54 INFO utils.Log4jControllerRegistration$: Registered > kafka:type=kafka.Log4jController MBean > Missing required argument "[partitions]" > {code} > But, what I believe it should do, since the appropriate way to adjust > replication factor is to use the kafka-reassign-partitions tool should be to > let the user know that this is not the appropriate way to adjust the > replication factor. It would look like this: > {code:java} > $ ./bin/kafka-topics.sh --bootstrap-server > {code} > _host_ > {code:java} > :9092 --alter --topic testTopic --replication-factor 4 > Option "[replication-factor]" can't be used with option "[alter]"{code} > Very minor improvement but should help alleviate what seems a little > unintuitive to some. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #10720: MINOR: preserve timestamp when getting value from upstream state store
mjsax commented on a change in pull request #10720: URL: https://github.com/apache/kafka/pull/10720#discussion_r634859119 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java ## @@ -141,16 +144,32 @@ public void close() { @Override public void init(final ProcessorContext context) { +internalProcessorContext = (InternalProcessorContext) context; parentGetter.init(context); valueTransformer.init(new ForwardingDisabledProcessorContext(context)); } @Override public ValueAndTimestamp get(final K key) { final ValueAndTimestamp valueAndTimestamp = parentGetter.get(key); -return ValueAndTimestamp.make( + +final ProcessorRecordContext currentContext = internalProcessorContext.recordContext(); Review comment: This is the actual fix. Before calling `valueTransformer.transform()` we need to set a different record context to allow `Transform` to access the correct metadata of the record it processed. Before this fix, the context would contain metadata (in particular record timestamp) or the "currently processed record" that triggers the lookup. This breaks the applied `Transformer` if it accessed the record timestamp via `context.timestamp()` that is supposed to return the timestamp of the record `Transform` processes (ie, the timestamp store next to the value in the state store). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10720: MINOR: preserve timestamp when getting value from upstream state store
mjsax commented on a change in pull request #10720: URL: https://github.com/apache/kafka/pull/10720#discussion_r634858161 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java ## @@ -63,15 +63,14 @@ public void enableSendingOldValues() { private Sensor droppedRecordsSensor; private TimestampedTupleForwarder tupleForwarder; -@SuppressWarnings("unchecked") Review comment: As I double checked all classes that implement `KTableValueGetter`, I did some side cleanup in all files I double checked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence
[ https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346805#comment-17346805 ] NEERAJ VAIDYA edited comment on KAFKA-12776 at 5/19/21, 1:45 AM: - [~ableegoldman] , [~ijuma] ,[~guozhang] I am keen to get such a feature into the API and ready to work on a KIP, if that is what is needed. was (Author: neeraj.vaidya): [~guozhang] I am keen to get such a feature into the API and ready to work on a KIP, if that is what is needed. > Producer sends messages out-of-order inspite of enabling idempotence > > > Key: KAFKA-12776 > URL: https://issues.apache.org/jira/browse/KAFKA-12776 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0, 2.7.0 > Environment: Linux RHEL 7.9 and Ubuntu 20.04 >Reporter: NEERAJ VAIDYA >Priority: Major > Attachments: mocker.zip > > > I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). > My application is basically a Spring boot web-application which accepts JSON > payloads via HTTP and then pushes each to a Kafka topic. I also use Spring > Cloud Stream Kafka in the application to create and use a Producer. > For one of my failure handling test cases, I shutdown the Kafka cluster while > my applications are running. (Note : No messages have been published to the > Kafka cluster before I stop the cluster) > When the producer application tries to write messages to TA, it cannot > because the cluster is down and hence (I assume) buffers the messages. Let's > say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is > first and m4 is last). > When I bring the Kafka cluster back online, the producer sends the buffered > messages to the topic, but they are not in order. I receive for example, m2 > then m3 then m1 and then m4. > Why is that ? Is it because the buffering in the producer is multi-threaded > with each producing to the topic at the same time ? > My project code is attached herewith. > I can confirm that I have enabled idempotence. I have also tried with > ```max.in.flight.requests=1``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax opened a new pull request #10720: MINOR: preserve timestamp when getting value from upstream state store
mjsax opened a new pull request #10720: URL: https://github.com/apache/kafka/pull/10720 When getting a value from an upstream state store, we should ensure that the record timestamp is accessible when applying a `Transformer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10715: KAFKA-9295: increase heartbeat and session timeout
showuon commented on pull request #10715: URL: https://github.com/apache/kafka/pull/10715#issuecomment-843681372 @ableegoldman , no problem! Updated! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ryannedolan commented on pull request #10681: KIP 731 WIP Connect per-task record rate limiting
ryannedolan commented on pull request #10681: URL: https://github.com/apache/kafka/pull/10681#issuecomment-843668847 Failing tests seem unrelated (KRaft and Streams). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10195) Move offset management codes from ConsumerCoordinator to a new class
[ https://issues.apache.org/jira/browse/KAFKA-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming resolved KAFKA-10195. Resolution: Won't Fix > Move offset management codes from ConsumerCoordinator to a new class > > > Key: KAFKA-10195 > URL: https://issues.apache.org/jira/browse/KAFKA-10195 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: dengziming >Assignee: dengziming >Priority: Minor > > ConsumerCoordinator has 2 main functions: > # partitions assignment > # offset management > We are adding some new features in it, for example KAFKA-9657 add a field > `throwOnFetchStableOffsetsUnsupported` which only used in offset management. > And the 2 functions almost don't interact with each other, so it's not wise > to put these code in one single class, can we try to move offset management > code to a new class? > For example, the below fields only used in offset management: > ``` > // can be move to another class directly > private final OffsetCommitCallback defaultOffsetCommitCallback; > private final ConsumerInterceptors interceptors; > private final AtomicInteger pendingAsyncCommits; > private final ConcurrentLinkedQueue > completedOffsetCommits; > private AtomicBoolean asyncCommitFenced; > private final boolean throwOnFetchStableOffsetsUnsupported; > private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null; > > // used in `onJoinComplete` but can also be moved out. > private final boolean autoCommitEnabled; > private final int autoCommitIntervalMs; > private Timer nextAutoCommitTimer; > ``` > So we can just create a new class `OffsetManageCoordinator` and move the > related codes into it. Similarly, a new class `SubscribeManager` can also be > created. here is the UML class diagram: > !image-2020-06-28-19-50-26-570.png! > > The above is the current design in which KafkaConsumer interact with Consumer > directly. the below is the new design, we add a `ConsumerCoordinatorFacade` > in which we put `OffsetCoordinator` and `SubscribeCoordinator` to manage > offset and assigning respectively. both `OffsetCoordinator` and > `SubscribeCoordinator` need a `AbstractCoordinator` cause they will interact > with each other(even rarely). > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah merged pull request #10719: Fix compile errors for KAFKA-12543
mumrah merged pull request #10719: URL: https://github.com/apache/kafka/pull/10719 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10719: Fix compile errors for KAFKA-12543
jsancio commented on a change in pull request #10719: URL: https://github.com/apache/kafka/pull/10719#discussion_r634746833 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java ## @@ -63,7 +63,7 @@ public void close() { String.format( "Unable to close snapshot reader %s at %s", snapshotId, -fileRecords.file +fileRecords Review comment: I checked. `toString` is defined for `FileRecords`. It looks like it has enough information to debug the issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #10719: Fix compile errors for KAFKA-12543
jsancio commented on pull request #10719: URL: https://github.com/apache/kafka/pull/10719#issuecomment-843561287 LGTM. Compiles locally and `KafkaMetadataLogTest` passes locally. Thanks @mumrah -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10719: Fix compile errors for KAFKA-12543
junrao commented on a change in pull request #10719: URL: https://github.com/apache/kafka/pull/10719#discussion_r634743071 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java ## @@ -63,7 +63,7 @@ public void close() { String.format( "Unable to close snapshot reader %s at %s", snapshotId, -fileRecords.file +fileRecords Review comment: Do we need to define the toString() method for FileRecords so that it prints well? ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -443,11 +443,12 @@ object KafkaMetadataLog { private def deleteSnapshotFiles( logDir: Path, -expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] +expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]], +logging: Logging ): () => Unit = () => { expiredSnapshots.foreach { case (snapshotId, snapshotReader) => snapshotReader.foreach { reader => -CoreUtils.swallow(reader.close(), this) +CoreUtils.swallow(reader.close(), logging) Review comment: This is an existing issue. But CoreUtils.swallow() doesn't seem to need logging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request #10719: Fix compile errors for KAFKA-12543
mumrah opened a new pull request #10719: URL: https://github.com/apache/kafka/pull/10719 Fixes for two compiler errors which were introduced in https://github.com/apache/kafka/pull/10431 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10646: KAFKA-8897 Follow-up: Consolidate the global state stores
ableegoldman commented on a change in pull request #10646: URL: https://github.com/apache/kafka/pull/10646#discussion_r634736745 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -174,13 +176,15 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name())); } +// register the store first, so that if later an exception is thrown then eventually while we call `close` Review comment: I definitely think we need to triage and maybe clean up the existing Illegal-type exceptions today. Some may not be deterministic, but we still just drop everything and shut down without any further attempts at cleaning up. In those cases it's probably down to the specific situation whether it's appropriate to continue doing so and disallow recovery from this, or just fix the handling so it does clean all resources -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12812) Consider refactoring state store registration path
[ https://issues.apache.org/jira/browse/KAFKA-12812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17347153#comment-17347153 ] A. Sophie Blee-Goldman commented on KAFKA-12812: 100% agree. Imo the old way of passing around this config was also just super annoying to navigate, and a much more awkward interface for users. > Consider refactoring state store registration path > -- > > Key: KAFKA-12812 > URL: https://issues.apache.org/jira/browse/KAFKA-12812 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Priority: Major > > Today our state store registration call path within the stateManager (both > local and global) is like this: > {code} > for each store: store.init(store, context) >-> context.register(root, callback) >-> stateManager.registerStore(store, callback) > {code} > One can see that, we have an awkward loop from stateManager back to > stateManager, and we require users to not forget calling > context.register(root, callback). We do this only in order to let users pass > the customized callback implementation to the stateManager. > What about a different path like this: > 1) We add a new interface in StateStore, like `StateRestoreCallback > getCallback()` that each impl class need to provide. > 2) We remove the `context.register(root, callback)` call; and because of > that, we do not need to pass in `root` in the store.init as well. > 3) stateManager just call `store.init(context)` (without the first > parameter), and then put the store along with its restore callback into the > map, without the separate `registerStore` function. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10715: KAFKA-9295: increase heartbeat and session timeout
ableegoldman commented on pull request #10715: URL: https://github.com/apache/kafka/pull/10715#issuecomment-843543131 @showuon I don't think we need to increase the heartbeat interval, let's just bump up the session timeout for now. We still want to be relatively quick in our broker requests/responses -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #10660: MINOR: Updating files with release 2.7.1
mimaison commented on pull request #10660: URL: https://github.com/apache/kafka/pull/10660#issuecomment-843514462 @mjsax / @chia7712 Can you take another look? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10713: HOTFIX: undo renaming of public part of Subtopology API
ableegoldman merged pull request #10713: URL: https://github.com/apache/kafka/pull/10713 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10713: HOTFIX: undo renaming of public part of Subtopology API
ableegoldman commented on pull request #10713: URL: https://github.com/apache/kafka/pull/10713#issuecomment-843513828 Some flaky test failures, Java11 build passed and this is a trivial renaming anyways. Merging -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model
mumrah merged pull request #10431: URL: https://github.com/apache/kafka/pull/10431 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10609: KAFKA-12648: Pt. 1 - Add NamedTopology to protocol and state directory structure
guozhangwang commented on a change in pull request #10609: URL: https://github.com/apache/kafka/pull/10609#discussion_r634626047 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -411,19 +442,37 @@ private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { } } } +maybeCleanEmptyNamedTopologyDirs(); Review comment: Should we move this into the try/catch IOException block as well (ditto below)? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -462,39 +512,49 @@ private void cleanRemovedTasksCalledByUser() throws Exception { * List all of the task directories that are non-empty * @return The list of all the non-empty local directories for stream tasks */ -File[] listNonEmptyTaskDirectories() { -final File[] taskDirectories; -if (!hasPersistentStores || !stateDir.exists()) { -taskDirectories = new File[0]; -} else { -taskDirectories = -stateDir.listFiles(pathname -> { -if (!pathname.isDirectory() || !TASK_DIR_PATH_NAME.matcher(pathname.getName()).matches()) { -return false; -} else { -return !taskDirIsEmpty(pathname); -} -}); -} - -return taskDirectories == null ? new File[0] : taskDirectories; +List listNonEmptyTaskDirectories() { +return listTaskDirectories(pathname -> { +if (!pathname.isDirectory() || !TASK_DIR_PATH_NAME.matcher(pathname.getName()).matches()) { +return false; +} else { +return !taskDirIsEmpty(pathname); +} +}); } /** - * List all of the task directories + * List all of the task directories along with their parent directory if they belong to a named topology * @return The list of all the existing local directories for stream tasks */ -File[] listAllTaskDirectories() { -final File[] taskDirectories; -if (!hasPersistentStores || !stateDir.exists()) { -taskDirectories = new File[0]; -} else { -taskDirectories = -stateDir.listFiles(pathname -> pathname.isDirectory() - && TASK_DIR_PATH_NAME.matcher(pathname.getName()).matches()); +List listAllTaskDirectories() { +return listTaskDirectories(pathname -> pathname.isDirectory() && TASK_DIR_PATH_NAME.matcher(pathname.getName()).matches()); +} + +private List listTaskDirectories(final FileFilter filter) { +final List taskDirectories = new ArrayList<>(); +if (hasPersistentStores && stateDir.exists()) { +if (hasNamedTopologies) { Review comment: Is it possible that we can have named topology state dirs and unamed (original) state dirs co-exist here? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -411,19 +442,37 @@ private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { } } } +maybeCleanEmptyNamedTopologyDirs(); +} + +private void maybeCleanEmptyNamedTopologyDirs() { Review comment: Could we just remove empty named topology dirs along the way instead of doing that in a second pass at the end? EDIT: nvm, after some thoughts I feel it is more complicated than easier. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java ## @@ -125,6 +130,29 @@ public int errorCode() { return data.errorCode(); } +// For version > MIN_NAMED_TOPOLOGY_VERSION +private void setTaskOffsetSumDataWithNamedTopologiesFromTaskOffsetSumMap(final Map taskOffsetSums) { +final Map> topicGroupIdToPartitionOffsetSum = new HashMap<>(); +for (final Map.Entry taskEntry : taskOffsetSums.entrySet()) { +final TaskId task = taskEntry.getKey(); + topicGroupIdToPartitionOffsetSum.computeIfAbsent(task.topicGroupId, t -> new ArrayList<>()).add( +new SubscriptionInfoData.PartitionToOffsetSum() +.setPartition(task.partition) +.setOffsetSum(taskEntry.getValue())); +} + +data.setTaskOffsetSums(taskOffsetSums.entrySet().stream().map(t -> { +final SubscriptionInfoData.TaskOffsetSum taskOffsetSum = new SubscriptionInfoData.TaskOffsetSum(); +final TaskId task = t.getKey(); +taskOffsetSum.setTopicGroupId(task.topicGroupId); +taskOffsetSum.setPartition(task.partition); Review comment: Could you remind m
[GitHub] [kafka] soarez commented on pull request #10650: KAFKA-12763 NoSuchElementException during checkpointLogStartOffsets
soarez commented on pull request #10650: URL: https://github.com/apache/kafka/pull/10650#issuecomment-843367411 Hi @dhruvilshah3 This happened when a broker that was offline for a long time came back up. I agree this shouldn't happen in the first place. I have logs, but it isn't clear how this happened. Still, given that it can happen, it seems to me that it is still better to avoid this exception. And I see no downsides in adding the extra check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10650: KAFKA-12763 NoSuchElementException during checkpointLogStartOffsets
ijuma commented on pull request #10650: URL: https://github.com/apache/kafka/pull/10650#issuecomment-843373430 @soarez the concern is that it may hide a problem. So, it's important to understand the sequence of events. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soarez closed pull request #10195: MINOR: Remove use of deprecated Gradle syntax
soarez closed pull request #10195: URL: https://github.com/apache/kafka/pull/10195 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soarez commented on pull request #10195: MINOR: Remove use of deprecated Gradle syntax
soarez commented on pull request #10195: URL: https://github.com/apache/kafka/pull/10195#issuecomment-843357496 This was fixed by #10203 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller
mumrah commented on a change in pull request #10504: URL: https://github.com/apache/kafka/pull/10504#discussion_r634580042 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -2376,6 +2375,82 @@ class KafkaController(val config: KafkaConfig, } } + def allocateProducerIds(allocateProducerIdsRequest: AllocateProducerIdsRequestData, + callback: AllocateProducerIdsResponseData => Unit): Unit = { + +def eventManagerCallback(results: Either[Errors, ProducerIdsBlock]): Unit = { + results match { +case Left(error) => callback.apply(new AllocateProducerIdsResponseData().setErrorCode(error.code)) +case Right(pidBlock) => callback.apply( + new AllocateProducerIdsResponseData() +.setProducerIdStart(pidBlock.producerIdStart()) +.setProducerIdLen(pidBlock.producerIdLen())) + } +} +eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId, + allocateProducerIdsRequest.brokerEpoch, eventManagerCallback)) + } + + def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit): Unit = { +// Handle a few short-circuits +if (!isActive) { + callback.apply(Left(Errors.NOT_CONTROLLER)) + return +} + +val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId) +if (brokerEpochOpt.isEmpty) { + warn(s"Ignoring AllocateProducerIds due to unknown broker $brokerId") + callback.apply(Left(Errors.STALE_BROKER_EPOCH)) Review comment: This error code was added for the quorum controller. I suppose it's fine to use with the ZK controller. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12812) Consider refactoring state store registration path
[ https://issues.apache.org/jira/browse/KAFKA-12812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17347051#comment-17347051 ] Guozhang Wang commented on KAFKA-12812: --- The reason I feel this is better is that, it could be less error prone as we do not have a call loop anymore, whereas in the past we have hit several bugs around this call trace. LMK WDYT [~cadonna] [~ableegoldman] > Consider refactoring state store registration path > -- > > Key: KAFKA-12812 > URL: https://issues.apache.org/jira/browse/KAFKA-12812 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Priority: Major > > Today our state store registration call path within the stateManager (both > local and global) is like this: > {code} > for each store: store.init(store, context) >-> context.register(root, callback) >-> stateManager.registerStore(store, callback) > {code} > One can see that, we have an awkward loop from stateManager back to > stateManager, and we require users to not forget calling > context.register(root, callback). We do this only in order to let users pass > the customized callback implementation to the stateManager. > What about a different path like this: > 1) We add a new interface in StateStore, like `StateRestoreCallback > getCallback()` that each impl class need to provide. > 2) We remove the `context.register(root, callback)` call; and because of > that, we do not need to pass in `root` in the store.init as well. > 3) stateManager just call `store.init(context)` (without the first > parameter), and then put the store along with its restore callback into the > map, without the separate `registerStore` function. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] soarez commented on pull request #10650: KAFKA-12763 NoSuchElementException during checkpointLogStartOffsets
soarez commented on pull request #10650: URL: https://github.com/apache/kafka/pull/10650#issuecomment-843352974 @ijuma can you have a look at this one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12812) Consider refactoring state store registration path
Guozhang Wang created KAFKA-12812: - Summary: Consider refactoring state store registration path Key: KAFKA-12812 URL: https://issues.apache.org/jira/browse/KAFKA-12812 Project: Kafka Issue Type: Bug Components: streams Reporter: Guozhang Wang Today our state store registration call path within the stateManager (both local and global) is like this: {code} for each store: store.init(store, context) -> context.register(root, callback) -> stateManager.registerStore(store, callback) {code} One can see that, we have an awkward loop from stateManager back to stateManager, and we require users to not forget calling context.register(root, callback). We do this only in order to let users pass the customized callback implementation to the stateManager. What about a different path like this: 1) We add a new interface in StateStore, like `StateRestoreCallback getCallback()` that each impl class need to provide. 2) We remove the `context.register(root, callback)` call; and because of that, we do not need to pass in `root` in the store.init as well. 3) stateManager just call `store.init(context)` (without the first parameter), and then put the store along with its restore callback into the map, without the separate `registerStore` function. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9726) LegacyReplicationPolicy for MM2 to mimic MM1
[ https://issues.apache.org/jira/browse/KAFKA-9726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew de Detrich reassigned KAFKA-9726: - Assignee: Matthew de Detrich (was: Ivan Yurchenko) > LegacyReplicationPolicy for MM2 to mimic MM1 > > > Key: KAFKA-9726 > URL: https://issues.apache.org/jira/browse/KAFKA-9726 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Ryanne Dolan >Assignee: Matthew de Detrich >Priority: Minor > > Per KIP-382, we should support MM2 in "legacy mode", i.e. with behavior > similar to MM1. A key requirement for this is a ReplicationPolicy that does > not rename topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] satishd commented on pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on pull request #10579: URL: https://github.com/apache/kafka/pull/10579#issuecomment-843289802 @junrao @kowshik This PR is ready for review. Pl let me know your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10651: MINOR: Kafka Streams code samples formating unification
jlprat commented on a change in pull request #10651: URL: https://github.com/apache/kafka/pull/10651#discussion_r634513734 ## File path: docs/streams/developer-guide/app-reset-tool.html ## @@ -78,18 +78,17 @@ Step 1: Run the application reset tool Invoke the application reset tool from the command line Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with --dry-run to preview your changes before making them. -/bin/kafka-streams-application-reset - + /bin/kafka-streams-application-reset The tool accepts the following parameters: -Option (* = required) Description +Option (* = required) Description Review comment: Fixed, and also aligned the spaces, so it fit's to the 2 column printing. ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -2482,21 +2440,22 @@ KTable-KTable Foreign-Key KTable that represents the "current" result of the join. (details) - - - KTable left = ...; -KTable right = ...;//This foreignKeyExtractor simply uses the left-value to map to the right-key.Function foreignKeyExtractor = (x) -> x;// Java 8+ example, using lambda expressions -KTable joined = left.join(right, foreignKeyExtractor, -(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */ - ); - - +KTable left = ...; +KTable right = ...; +//This foreignKeyExtractor simply uses the left-value to map to the right-key. +Function foreignKeyExtractor = (x) -> x; + +// Java 8+ example, using lambda expressions +KTable joined = left.join(right, +foreignKeyExtractor, Review comment: Fixed ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -2542,21 +2501,22 @@ KTable-KTable Foreign-Key Performs a foreign-key LEFT JOIN of this table with another table. (details) - - - KTable left = ...; -KTable right = ...;//This foreignKeyExtractor simply uses the left-value to map to the right-key.Function foreignKeyExtractor = (x) -> x;// Java 8+ example, using lambda expressions -KTable joined = left.join(right, foreignKeyExtractor, -(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */ - ); - - +KTable left = ...; +KTable right = ...; +//This foreignKeyExtractor simply uses the left-value to map to the right-key. +Function foreignKeyExtractor = (x) -> x; + +// Java 8+ example, using lambda expressions +KTable joined = left.join(right, +foreignKeyExtractor, Review comment: Fixed ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3207,15 +3161,14 @@ KTable-KTable Foreign-Key terminology in academic literature, where the semantics of sliding windows are different to those of hopping windows. The following code defines a hopping window with a size of 5 minutes and an advance interval of 1 minute: -import java.time.Duration; -import org.apache.kafka.streams.kstream.TimeWindows; - -// A hopping time window with a size of 5 minutes and an advance interval of 1 minute. -// The window's name -- the string parameter -- is used to e.g. name the backing state store. -Duration windowSizeMs = Duration.ofMinutes(5); -Duration advanceMs =Duration.ofMinutes(1); -TimeWindows.of(windowSizeMs).advanceBy(advanceMs); - +import java.time.Duration; +import org.apache.kafka.streams.kstream.TimeWindows; + +// A hopping time window with a size of 5 minutes and an advance interval of 1 minute. +// The window's name -- the string parameter -- is used to e.g. name the backing state store. +Duration windowSizeMs = Duration.ofMinutes(5); +Duration advanceMs =Duration.ofMinutes(1); Review comment: Fixed -- This is an auto
[jira] [Commented] (KAFKA-12811) kafka-topics.sh should let the user know they cannot adjust the replication factor for a topic using the --alter flag and not warn about missing the --partition flag
[ https://issues.apache.org/jira/browse/KAFKA-12811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346980#comment-17346980 ] Richard Joerger commented on KAFKA-12811: - Hi team, I've gone ahead and submitted a patch for this. I'd like to be assigned to it so I can mark it as ready for review. > kafka-topics.sh should let the user know they cannot adjust the replication > factor for a topic using the --alter flag and not warn about missing the > --partition flag > - > > Key: KAFKA-12811 > URL: https://issues.apache.org/jira/browse/KAFKA-12811 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Richard Joerger >Priority: Trivial > > Currently the kafka-topics.sh tool will return the following message when > trying to alter the replication factor for a topic: > > {code:java} > $ ./bin/kafka-topics.sh --bootstrap-server > {code} > _host_ > {code:java} > :9092 --alter --topic testTopic --replication-factor 4 > 21/05/18 13:23:54 INFO utils.Log4jControllerRegistration$: Registered > kafka:type=kafka.Log4jController MBean > Missing required argument "[partitions]" > {code} > But, what I believe it should do, since the appropriate way to adjust > replication factor is to use the kafka-reassign-partitions tool should be to > let the user know that this is not the appropriate way to adjust the > replication factor. It would look like this: > {code:java} > $ ./bin/kafka-topics.sh --bootstrap-server > {code} > _host_ > {code:java} > :9092 --alter --topic testTopic --replication-factor 4 > Option "[replication-factor]" can't be used with option "[alter]"{code} > Very minor improvement but should help alleviate what seems a little > unintuitive to some. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] Moovlin opened a new pull request #10718: [KAFKA-12811] kafka-topics.sh should let the user know they cannot adjust the replication factor for a topic using the --alter flag and not
Moovlin opened a new pull request #10718: URL: https://github.com/apache/kafka/pull/10718 Added 1 new line to check that that "replication factor" & "alter" are not being used together. Uses the existing CommandLineUtils to do the check. Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9726) LegacyReplicationPolicy for MM2 to mimic MM1
[ https://issues.apache.org/jira/browse/KAFKA-9726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346969#comment-17346969 ] Matthew de Detrich commented on KAFKA-9726: --- Would it be possible to assign the ticket to me, I am taking over the task from [~ivanyu] (I have already created a PR at [https://github.com/apache/kafka/pull/10648).] [~ivanyu] has tried changing the assignee to me but it didn't work. > LegacyReplicationPolicy for MM2 to mimic MM1 > > > Key: KAFKA-9726 > URL: https://issues.apache.org/jira/browse/KAFKA-9726 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Ryanne Dolan >Assignee: Ivan Yurchenko >Priority: Minor > > Per KIP-382, we should support MM2 in "legacy mode", i.e. with behavior > similar to MM1. A key requirement for this is a ReplicationPolicy that does > not rename topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft
rondagostino commented on pull request #10550: URL: https://github.com/apache/kafka/pull/10550#issuecomment-843228313 @cmccabe I merged in trunk and pushed a separate commit to fix the failing unit tests. The way we test to ensure an API is always forwarded for KRaft is to send the request directly through into `KafkaApis` as though it wasn't forwarded and assert that we get `UnsupportedVersionException`. You had inadvertently short-circuited our ability to send the request through to `KafkaApis` by invoking the new `AclApis` directly in the `request.header.apiKey match {` block. I fixed the tests by re-adding the standard `handle...()` methods and have them check the metadata support status if necessary and then invoke `AclApis` -- that way we still have the check that we are using ZooKeeper and not KRaft, and the tests can get the expected exception and pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12811) kafka-topics.sh should let the user know they cannot adjust the replication factor for a topic using the --alter flag and not warn about missing the --partition flag
Richard Joerger created KAFKA-12811: --- Summary: kafka-topics.sh should let the user know they cannot adjust the replication factor for a topic using the --alter flag and not warn about missing the --partition flag Key: KAFKA-12811 URL: https://issues.apache.org/jira/browse/KAFKA-12811 Project: Kafka Issue Type: Improvement Components: admin Reporter: Richard Joerger Currently the kafka-topics.sh tool will return the following message when trying to alter the replication factor for a topic: {code:java} $ ./bin/kafka-topics.sh --bootstrap-server {code} _host_ {code:java} :9092 --alter --topic testTopic --replication-factor 4 21/05/18 13:23:54 INFO utils.Log4jControllerRegistration$: Registered kafka:type=kafka.Log4jController MBean Missing required argument "[partitions]" {code} But, what I believe it should do, since the appropriate way to adjust replication factor is to use the kafka-reassign-partitions tool should be to let the user know that this is not the appropriate way to adjust the replication factor. It would look like this: {code:java} $ ./bin/kafka-topics.sh --bootstrap-server {code} _host_ {code:java} :9092 --alter --topic testTopic --replication-factor 4 Option "[replication-factor]" can't be used with option "[alter]"{code} Very minor improvement but should help alleviate what seems a little unintuitive to some. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jlprat commented on pull request #10651: MINOR: Kafka Streams code samples formating unification
jlprat commented on pull request #10651: URL: https://github.com/apache/kafka/pull/10651#issuecomment-843207783 Thanks @cadonna, all good points. I'll address the feedback later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10651: MINOR: Kafka Streams code samples formating unification
cadonna commented on a change in pull request #10651: URL: https://github.com/apache/kafka/pull/10651#discussion_r634203662 ## File path: docs/streams/developer-guide/app-reset-tool.html ## @@ -78,18 +78,17 @@ Step 1: Run the application reset tool Invoke the application reset tool from the command line Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with --dry-run to preview your changes before making them. -/bin/kafka-streams-application-reset - + /bin/kafka-streams-application-reset The tool accepts the following parameters: -Option (* = required) Description +Option (* = required) Description Review comment: Class `language-bash` renders the text with strange colors, e.g., the keyword "file" which is not a keyword here. Since this is actually not a bash script code, I think we leave it as plain monospaced text. ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -2542,21 +2501,22 @@ KTable-KTable Foreign-Key Performs a foreign-key LEFT JOIN of this table with another table. (details) - - - KTable left = ...; -KTable right = ...;//This foreignKeyExtractor simply uses the left-value to map to the right-key.Function foreignKeyExtractor = (x) -> x;// Java 8+ example, using lambda expressions -KTable joined = left.join(right, foreignKeyExtractor, -(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */ - ); - - +KTable left = ...; +KTable right = ...; +//This foreignKeyExtractor simply uses the left-value to map to the right-key. +Function foreignKeyExtractor = (x) -> x; + +// Java 8+ example, using lambda expressions +KTable joined = left.join(right, +foreignKeyExtractor, Review comment: Could you please put `foreignKeyExtractor` on the previous line or indent it so that it is at the same column as `right`? ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3207,15 +3161,14 @@ KTable-KTable Foreign-Key terminology in academic literature, where the semantics of sliding windows are different to those of hopping windows. The following code defines a hopping window with a size of 5 minutes and an advance interval of 1 minute: -import java.time.Duration; -import org.apache.kafka.streams.kstream.TimeWindows; - -// A hopping time window with a size of 5 minutes and an advance interval of 1 minute. -// The window's name -- the string parameter -- is used to e.g. name the backing state store. -Duration windowSizeMs = Duration.ofMinutes(5); -Duration advanceMs =Duration.ofMinutes(1); -TimeWindows.of(windowSizeMs).advanceBy(advanceMs); - +import java.time.Duration; +import org.apache.kafka.streams.kstream.TimeWindows; + +// A hopping time window with a size of 5 minutes and an advance interval of 1 minute. +// The window's name -- the string parameter -- is used to e.g. name the backing state store. +Duration windowSizeMs = Duration.ofMinutes(5); +Duration advanceMs =Duration.ofMinutes(1); Review comment: Could you fix the indentation here to `Duration advanceMs = Duration.ofMinutes(1);`? ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -2482,21 +2440,22 @@ KTable-KTable Foreign-Key KTable that represents the "current" result of the join. (details) - - - KTable left = ...; -KTable right = ...;//This foreignKeyExtractor simply uses the left-value to map to the right-key.Function foreignKeyExtractor = (x) -> x;// Java 8+ example, using lambda expressions -KTable joined = left.join(right, foreignKeyExtractor, -(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */ - ); - - +KTable left = ...; +KTable right
[jira] [Commented] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss
[ https://issues.apache.org/jira/browse/KAFKA-12801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346920#comment-17346920 ] Lucas Bradstreet commented on KAFKA-12801: -- JFR is perfectly fine too. > High CPU load after restarting brokers subsequent to quorum loss > > > Key: KAFKA-12801 > URL: https://issues.apache.org/jira/browse/KAFKA-12801 > Project: Kafka > Issue Type: Bug > Components: core, KafkaConnect >Affects Versions: 2.8.0 >Reporter: Gunnar Morling >Priority: Major > > I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of > three Kafka nodes (all combined nodes), and one Kafka Connect node. After > starting all components, I first stop the current controller of the Kafka > cluster, then I stop the then controller of the Kafka cluster. At this point, > only one Kafka node out of the original three and Connect is running. > When now restarting the two stopped Kafka nodes, CPU load on the Connect node > and the two broker nodes goes up to 100% and remains at that level for an > indefinite amount of time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah merged pull request #10561: KAFKA-12686 AlterIsr and LeaderAndIsr race condition
mumrah merged pull request #10561: URL: https://github.com/apache/kafka/pull/10561 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10457: KAFKA-12596: remove --zookeeper option from topic command
showuon commented on pull request #10457: URL: https://github.com/apache/kafka/pull/10457#issuecomment-843191316 failed tests are unrelated and flaky. Thanks. ``` Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInSinkTaskStart Build / JDK 15 and Scala 2.13 / org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.testTopicsAreCreatedWhenAutoCreateTopicsIsEnabledAtTheBroker Build / JDK 11 and Scala 2.13 / kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12419) Remove Deprecated APIs of Kafka Streams in 3.0
[ https://issues.apache.org/jira/browse/KAFKA-12419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346902#comment-17346902 ] Josep Prat commented on KAFKA-12419: I created https://issues.apache.org/jira/browse/KAFKA-12808 and https://issues.apache.org/jira/browse/KAFKA-12809 as separated issues in case they can't be removed for any other reason, and they can be discussed separately. I'm still working through creating issues for the remaining deprecated methods I found > Remove Deprecated APIs of Kafka Streams in 3.0 > -- > > Key: KAFKA-12419 > URL: https://issues.apache.org/jira/browse/KAFKA-12419 > Project: Kafka > Issue Type: Improvement > Components: streams, streams-test-utils >Reporter: Guozhang Wang >Assignee: Tomasz Nguyen >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > Here's a list of deprecated APIs that we have accumulated in the past, we can > consider removing them in 3.0: > * KIP-198: "--zookeeper" flag from StreamsResetter (1.0) > * KIP-171: "–execute" flag from StreamsResetter (1.1) > * KIP-233: overloaded "StreamsBuilder#addGlobalStore" (1.1) > * KIP-251: overloaded "ProcessorContext#forward" (2.0) > * KIP-276: "StreamsConfig#getConsumerConfig" (2.0) > * KIP-319: "WindowBytesStoreSupplier#segments" (2.1) > * KIP-321: "TopologyDescription.Source#topics" (2.1) > * KIP-328: "Windows#until/segmentInterval/maintainMS" (2.1) > * KIP-358: "Windows/Materialized" overloaded functions with `long` (2.1) > * KIP-365/366: Implicit Scala Apis (2.1) > * KIP-372: overloaded "KStream#groupBy" (2.1) > * KIP-307: "Joined#named" (2.3) > * KIP-345: Broker config "group.initial.rebalance.delay.ms" (2.3) > * KIP-429: "PartitionAssignor" interface (2.4) > * KIP-470: "TopologyTestDriver#pipeInput" (2.4) > * KIP-476: overloaded "KafkaClientSupplier#getAdminClient" (2.4) > * KIP-479: overloaded "KStream#join" (2.4) > * KIP-530: old "UsePreviousTimeOnInvalidTimeStamp" (2.5) > * KIP-535 / 562: overloaded "KafkaStreams#metadataForKey" and > "KafkaStreams#store" (2.5) > And here's a list of already filed JIRAs for removing deprecated APIs > * KAFKA-10434 > * KAFKA-7785 > * KAFKA-12796 > * KAFKA-12809 > * KAFKA-12808 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12419) Remove Deprecated APIs of Kafka Streams in 3.0
[ https://issues.apache.org/jira/browse/KAFKA-12419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat updated KAFKA-12419: --- Description: Here's a list of deprecated APIs that we have accumulated in the past, we can consider removing them in 3.0: * KIP-198: "--zookeeper" flag from StreamsResetter (1.0) * KIP-171: "–execute" flag from StreamsResetter (1.1) * KIP-233: overloaded "StreamsBuilder#addGlobalStore" (1.1) * KIP-251: overloaded "ProcessorContext#forward" (2.0) * KIP-276: "StreamsConfig#getConsumerConfig" (2.0) * KIP-319: "WindowBytesStoreSupplier#segments" (2.1) * KIP-321: "TopologyDescription.Source#topics" (2.1) * KIP-328: "Windows#until/segmentInterval/maintainMS" (2.1) * KIP-358: "Windows/Materialized" overloaded functions with `long` (2.1) * KIP-365/366: Implicit Scala Apis (2.1) * KIP-372: overloaded "KStream#groupBy" (2.1) * KIP-307: "Joined#named" (2.3) * KIP-345: Broker config "group.initial.rebalance.delay.ms" (2.3) * KIP-429: "PartitionAssignor" interface (2.4) * KIP-470: "TopologyTestDriver#pipeInput" (2.4) * KIP-476: overloaded "KafkaClientSupplier#getAdminClient" (2.4) * KIP-479: overloaded "KStream#join" (2.4) * KIP-530: old "UsePreviousTimeOnInvalidTimeStamp" (2.5) * KIP-535 / 562: overloaded "KafkaStreams#metadataForKey" and "KafkaStreams#store" (2.5) And here's a list of already filed JIRAs for removing deprecated APIs * KAFKA-10434 * KAFKA-7785 * KAFKA-12796 * KAFKA-12809 * KAFKA-12808 was: Here's a list of deprecated APIs that we have accumulated in the past, we can consider removing them in 3.0: * KIP-198: "--zookeeper" flag from StreamsResetter (1.0) * KIP-171: "–execute" flag from StreamsResetter (1.1) * KIP-233: overloaded "StreamsBuilder#addGlobalStore" (1.1) * KIP-251: overloaded "ProcessorContext#forward" (2.0) * KIP-276: "StreamsConfig#getConsumerConfig" (2.0) * KIP-319: "WindowBytesStoreSupplier#segments" (2.1) * KIP-321: "TopologyDescription.Source#topics" (2.1) * KIP-328: "Windows#until/segmentInterval/maintainMS" (2.1) * KIP-358: "Windows/Materialized" overloaded functions with `long` (2.1) * KIP-365/366: Implicit Scala Apis (2.1) * KIP-372: overloaded "KStream#groupBy" (2.1) * KIP-307: "Joined#named" (2.3) * KIP-345: Broker config "group.initial.rebalance.delay.ms" (2.3) * KIP-429: "PartitionAssignor" interface (2.4) * KIP-470: "TopologyTestDriver#pipeInput" (2.4) * KIP-476: overloaded "KafkaClientSupplier#getAdminClient" (2.4) * KIP-479: overloaded "KStream#join" (2.4) * KIP-530: old "UsePreviousTimeOnInvalidTimeStamp" (2.5) * KIP-535 / 562: overloaded "KafkaStreams#metadataForKey" and "KafkaStreams#store" (2.5) And here's a list of already filed JIRAs for removing deprecated APIs * KAFKA-10434 * KAFKA-7785 * KAFKA-12796 > Remove Deprecated APIs of Kafka Streams in 3.0 > -- > > Key: KAFKA-12419 > URL: https://issues.apache.org/jira/browse/KAFKA-12419 > Project: Kafka > Issue Type: Improvement > Components: streams, streams-test-utils >Reporter: Guozhang Wang >Assignee: Tomasz Nguyen >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > Here's a list of deprecated APIs that we have accumulated in the past, we can > consider removing them in 3.0: > * KIP-198: "--zookeeper" flag from StreamsResetter (1.0) > * KIP-171: "–execute" flag from StreamsResetter (1.1) > * KIP-233: overloaded "StreamsBuilder#addGlobalStore" (1.1) > * KIP-251: overloaded "ProcessorContext#forward" (2.0) > * KIP-276: "StreamsConfig#getConsumerConfig" (2.0) > * KIP-319: "WindowBytesStoreSupplier#segments" (2.1) > * KIP-321: "TopologyDescription.Source#topics" (2.1) > * KIP-328: "Windows#until/segmentInterval/maintainMS" (2.1) > * KIP-358: "Windows/Materialized" overloaded functions with `long` (2.1) > * KIP-365/366: Implicit Scala Apis (2.1) > * KIP-372: overloaded "KStream#groupBy" (2.1) > * KIP-307: "Joined#named" (2.3) > * KIP-345: Broker config "group.initial.rebalance.delay.ms" (2.3) > * KIP-429: "PartitionAssignor" interface (2.4) > * KIP-470: "TopologyTestDriver#pipeInput" (2.4) > * KIP-476: overloaded "KafkaClientSupplier#getAdminClient" (2.4) > * KIP-479: overloaded "KStream#join" (2.4) > * KIP-530: old "UsePreviousTimeOnInvalidTimeStamp" (2.5) > * KIP-535 / 562: overloaded "KafkaStreams#metadataForKey" and > "KafkaStreams#store" (2.5) > And here's a list of already filed JIRAs for removing deprecated APIs > * KAFKA-10434 > * KAFKA-7785 > * KAFKA-12796 > * KAFKA-12809 > * KAFKA-12808 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12810) Remove deprecated TopologyDescription.Source#topics
Josep Prat created KAFKA-12810: -- Summary: Remove deprecated TopologyDescription.Source#topics Key: KAFKA-12810 URL: https://issues.apache.org/jira/browse/KAFKA-12810 Project: Kafka Issue Type: Sub-task Reporter: Josep Prat Fix For: 3.0.0 As identified on https://issues.apache.org/jira/browse/KAFKA-12419 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12808) Remove Deprecated methods under StreamsMetrics
[ https://issues.apache.org/jira/browse/KAFKA-12808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat updated KAFKA-12808: --- Description: There are 4 methods in StreamMetrics and StreamsMetricsImpl that are deprecated since 2.5: * org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#recordLatency * org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#recordThroughput * org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#addLatencyAndThroughputSensor * org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#addThroughputSensor * org.apache.kafka.streams.StreamsMetrics#addLatencyAndThroughputSensor * org.apache.kafka.streams.StreamsMetrics#recordLatency * org.apache.kafka.streams.StreamsMetrics#addThroughputSensor * org.apache.kafka.streams.StreamsMetrics#recordThroughput As far as I understand, they are all deprecated for long enough to be removed for version 3.0. Those methods were deprecated during task: https://issues.apache.org/jira/browse/KAFKA-9230 was: There are 4 methods in StreamsMetricsImpl that are deprecated since 2.5: * org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#recordLatency * org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#recordThroughput * org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#addLatencyAndThroughputSensor * org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#addThroughputSensor As far as I understand, they are all deprecated for long enough to be removed for version 3.0. Those methods were deprecated during task: https://issues.apache.org/jira/browse/KAFKA-9230 > Remove Deprecated methods under StreamsMetrics > -- > > Key: KAFKA-12808 > URL: https://issues.apache.org/jira/browse/KAFKA-12808 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Josep Prat >Assignee: Josep Prat >Priority: Blocker > Fix For: 3.0.0 > > > There are 4 methods in StreamMetrics and StreamsMetricsImpl that are > deprecated since 2.5: > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#recordLatency > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#recordThroughput > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#addLatencyAndThroughputSensor > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#addThroughputSensor > * org.apache.kafka.streams.StreamsMetrics#addLatencyAndThroughputSensor > * org.apache.kafka.streams.StreamsMetrics#recordLatency > * org.apache.kafka.streams.StreamsMetrics#addThroughputSensor > * org.apache.kafka.streams.StreamsMetrics#recordThroughput > As far as I understand, they are all deprecated for long enough to be removed > for version 3.0. > Those methods were deprecated during task: > https://issues.apache.org/jira/browse/KAFKA-9230 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12808) Remove Deprecated methods under StreamsMetrics
[ https://issues.apache.org/jira/browse/KAFKA-12808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat updated KAFKA-12808: --- Summary: Remove Deprecated methods under StreamsMetrics (was: Remove Deprecated methods under StreamsMetricsImpl) > Remove Deprecated methods under StreamsMetrics > -- > > Key: KAFKA-12808 > URL: https://issues.apache.org/jira/browse/KAFKA-12808 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Josep Prat >Assignee: Josep Prat >Priority: Blocker > Fix For: 3.0.0 > > > There are 4 methods in StreamsMetricsImpl that are deprecated since 2.5: > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#recordLatency > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#recordThroughput > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#addLatencyAndThroughputSensor > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#addThroughputSensor > As far as I understand, they are all deprecated for long enough to be removed > for version 3.0. > Those methods were deprecated during task: > https://issues.apache.org/jira/browse/KAFKA-9230 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12809) Remove Deprecated methods under Stores
[ https://issues.apache.org/jira/browse/KAFKA-12809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat updated KAFKA-12809: --- Component/s: streams Fix Version/s: 3.0.0 Description: There are 2 methods in org.apache.kafka.streams.state.Stores that are deprecated since 2.1: * org.apache.kafka.streams.state.Stores#persistentWindowStore(java.lang.String, long, int, long, boolean) * org.apache.kafka.streams.state.Stores#persistentSessionStore(java.lang.String, long) As far as I understand, they are all deprecated for long enough to be removed for version 3.0. Those methods were deprecated during PR: https://github.com/apache/kafka/pull/6290 Priority: Blocker (was: Major) Summary: Remove Deprecated methods under Stores (was: Remove De) > Remove Deprecated methods under Stores > -- > > Key: KAFKA-12809 > URL: https://issues.apache.org/jira/browse/KAFKA-12809 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Josep Prat >Priority: Blocker > Fix For: 3.0.0 > > > There are 2 methods in org.apache.kafka.streams.state.Stores that are > deprecated since 2.1: > * > org.apache.kafka.streams.state.Stores#persistentWindowStore(java.lang.String, > long, int, long, boolean) > * > org.apache.kafka.streams.state.Stores#persistentSessionStore(java.lang.String, > long) > As far as I understand, they are all deprecated for long enough to be removed > for version 3.0. > Those methods were deprecated during PR: > https://github.com/apache/kafka/pull/6290 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12809) Remove De
Josep Prat created KAFKA-12809: -- Summary: Remove De Key: KAFKA-12809 URL: https://issues.apache.org/jira/browse/KAFKA-12809 Project: Kafka Issue Type: Task Reporter: Josep Prat -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12808) Remove Deprecated methods under StreamsMetricsImpl
[ https://issues.apache.org/jira/browse/KAFKA-12808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346891#comment-17346891 ] Josep Prat commented on KAFKA-12808: I'll take this one > Remove Deprecated methods under StreamsMetricsImpl > -- > > Key: KAFKA-12808 > URL: https://issues.apache.org/jira/browse/KAFKA-12808 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Josep Prat >Assignee: Josep Prat >Priority: Blocker > Fix For: 3.0.0 > > > There are 4 methods in StreamsMetricsImpl that are deprecated since 2.5: > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#recordLatency > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#recordThroughput > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#addLatencyAndThroughputSensor > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#addThroughputSensor > As far as I understand, they are all deprecated for long enough to be removed > for version 3.0. > Those methods were deprecated during task: > https://issues.apache.org/jira/browse/KAFKA-9230 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12808) Remove Deprecated methods under StreamsMetricsImpl
[ https://issues.apache.org/jira/browse/KAFKA-12808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat reassigned KAFKA-12808: -- Assignee: Josep Prat > Remove Deprecated methods under StreamsMetricsImpl > -- > > Key: KAFKA-12808 > URL: https://issues.apache.org/jira/browse/KAFKA-12808 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Josep Prat >Assignee: Josep Prat >Priority: Blocker > Fix For: 3.0.0 > > > There are 4 methods in StreamsMetricsImpl that are deprecated since 2.5: > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#recordLatency > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#recordThroughput > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#addLatencyAndThroughputSensor > * > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#addThroughputSensor > As far as I understand, they are all deprecated for long enough to be removed > for version 3.0. > Those methods were deprecated during task: > https://issues.apache.org/jira/browse/KAFKA-9230 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12808) Remove Deprecated methods under StreamsMetricsImpl
Josep Prat created KAFKA-12808: -- Summary: Remove Deprecated methods under StreamsMetricsImpl Key: KAFKA-12808 URL: https://issues.apache.org/jira/browse/KAFKA-12808 Project: Kafka Issue Type: Task Components: streams Reporter: Josep Prat Fix For: 3.0.0 There are 4 methods in StreamsMetricsImpl that are deprecated since 2.5: * org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#recordLatency * org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#recordThroughput * org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#addLatencyAndThroughputSensor * org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl#addThroughputSensor As far as I understand, they are all deprecated for long enough to be removed for version 3.0. Those methods were deprecated during task: https://issues.apache.org/jira/browse/KAFKA-9230 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order
tombentley commented on pull request #9441: URL: https://github.com/apache/kafka/pull/9441#issuecomment-843148293 @hachikuji I fixed the tests and made you suggested changes, but what do you think about @guozhangwang's point [above](https://github.com/apache/kafka/pull/9441#discussion_r624219744)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12807) allow mTLS authorization based on different fields of X509 certificate
Naresh created KAFKA-12807: -- Summary: allow mTLS authorization based on different fields of X509 certificate Key: KAFKA-12807 URL: https://issues.apache.org/jira/browse/KAFKA-12807 Project: Kafka Issue Type: Improvement Reporter: Naresh Builtin simple authorizer uses X500Principal to authorize the mTLS principals. There are other fields like SAN (Subject Alternative Name), Serial.No can be used to extend the certificate properties. Adding authorization based on SAN would help break the dependency on the CommonName in environments where CommonName is used with Autogenerated hostnames at the same time use the certs to do client auth with the kafka brokers. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] NLincoln opened a new pull request #10717: KAFKA-12800: Configure generator to fail on trailing JSON tokens
NLincoln opened a new pull request #10717: URL: https://github.com/apache/kafka/pull/10717 See #10709 for an example of this happening. The tl;dr is that Jackson will ignore trailing tokens by default, but other Json parsers cannot be configured to ignore them. This makes sure we don't regress :). # Testing I ran `./gradlew processMessages` and saw that everything completed succesfully. I then put a trailing `}` into one of the files and saw that `processMessages` failed. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12806) KRaft: Confusing leadership status exposed in metrics for controller without quorum
[ https://issues.apache.org/jira/browse/KAFKA-12806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-12806: --- Summary: KRaft: Confusing leadership status exposed in metrics for controller without quorum (was: KRaft: Confusing leadership status exposed for controller without quorum) > KRaft: Confusing leadership status exposed in metrics for controller without > quorum > --- > > Key: KAFKA-12806 > URL: https://issues.apache.org/jira/browse/KAFKA-12806 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Gunnar Morling >Priority: Major > > I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of > three Kafka nodes (all combined nodes). After starting all components, I > first stop the current controller of the Kafka cluster, then I stop the then > controller of the Kafka cluster. At this point, only one Kafka node out of > the original three and Connect is running. In the new KRaft-based metrics, > "leader" is exposed as the role for that node, and its id is shown as the > current leader. Also in the metadata shell, that node is shown as the quorum > leader via /metadataQuorum/leader. This is pretty confusing, as one out of > three nodes cannot have the quorum. I believe this is mostly an issue of > displaying the status, as for instance creating a topic in this state times > out. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12806) KRaft: Confusing leadership status exposed for controller without quorum
[ https://issues.apache.org/jira/browse/KAFKA-12806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-12806: --- Description: I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of three Kafka nodes (all combined nodes). After starting all components, I first stop the current controller of the Kafka cluster, then I stop the then controller of the Kafka cluster. At this point, only one Kafka node out of the original three and Connect is running. In the new KRaft-based metrics, "leader" is exposed as the role for that node, and its id is shown as the current leader. Also in the metadata shell, that node is shown as the quorum leader via /metadataQuorum/leader. This is pretty confusing, as one out of three nodes cannot have the quorum. I believe this is mostly an issue of displaying the status, as for instance creating a topic in this state times out. > KRaft: Confusing leadership status exposed for controller without quorum > > > Key: KAFKA-12806 > URL: https://issues.apache.org/jira/browse/KAFKA-12806 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Gunnar Morling >Priority: Major > > I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of > three Kafka nodes (all combined nodes). After starting all components, I > first stop the current controller of the Kafka cluster, then I stop the then > controller of the Kafka cluster. At this point, only one Kafka node out of > the original three and Connect is running. In the new KRaft-based metrics, > "leader" is exposed as the role for that node, and its id is shown as the > current leader. Also in the metadata shell, that node is shown as the quorum > leader via /metadataQuorum/leader. This is pretty confusing, as one out of > three nodes cannot have the quorum. I believe this is mostly an issue of > displaying the status, as for instance creating a topic in this state times > out. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12806) KRaft: Confusing leadership status exposed for controller without quorum
Gunnar Morling created KAFKA-12806: -- Summary: KRaft: Confusing leadership status exposed for controller without quorum Key: KAFKA-12806 URL: https://issues.apache.org/jira/browse/KAFKA-12806 Project: Kafka Issue Type: Bug Affects Versions: 2.8.0 Reporter: Gunnar Morling -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss
[ https://issues.apache.org/jira/browse/KAFKA-12801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346835#comment-17346835 ] Gunnar Morling commented on KAFKA-12801: Would a JFR profiling work, too? I could provide that one a bit more easily, as I'm more familiar with this tool. > High CPU load after restarting brokers subsequent to quorum loss > > > Key: KAFKA-12801 > URL: https://issues.apache.org/jira/browse/KAFKA-12801 > Project: Kafka > Issue Type: Bug > Components: core, KafkaConnect >Affects Versions: 2.8.0 >Reporter: Gunnar Morling >Priority: Major > > I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of > three Kafka nodes (all combined nodes), and one Kafka Connect node. After > starting all components, I first stop the current controller of the Kafka > cluster, then I stop the then controller of the Kafka cluster. At this point, > only one Kafka node out of the original three and Connect is running. > When now restarting the two stopped Kafka nodes, CPU load on the Connect node > and the two broker nodes goes up to 100% and remains at that level for an > indefinite amount of time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence
[ https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346805#comment-17346805 ] NEERAJ VAIDYA commented on KAFKA-12776: --- [~guozhang] I am keen to get such a feature into the API and ready to work on a KIP, if that is what is needed. > Producer sends messages out-of-order inspite of enabling idempotence > > > Key: KAFKA-12776 > URL: https://issues.apache.org/jira/browse/KAFKA-12776 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0, 2.7.0 > Environment: Linux RHEL 7.9 and Ubuntu 20.04 >Reporter: NEERAJ VAIDYA >Priority: Major > Attachments: mocker.zip > > > I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). > My application is basically a Spring boot web-application which accepts JSON > payloads via HTTP and then pushes each to a Kafka topic. I also use Spring > Cloud Stream Kafka in the application to create and use a Producer. > For one of my failure handling test cases, I shutdown the Kafka cluster while > my applications are running. (Note : No messages have been published to the > Kafka cluster before I stop the cluster) > When the producer application tries to write messages to TA, it cannot > because the cluster is down and hence (I assume) buffers the messages. Let's > say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is > first and m4 is last). > When I bring the Kafka cluster back online, the producer sends the buffered > messages to the topic, but they are not in order. I receive for example, m2 > then m3 then m1 and then m4. > Why is that ? Is it because the buffering in the producer is multi-threaded > with each producing to the topic at the same time ? > My project code is attached herewith. > I can confirm that I have enabled idempotence. I have also tried with > ```max.in.flight.requests=1``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jlprat commented on pull request #10711: MINOR: Update Scala to 2.13.6
jlprat commented on pull request #10711: URL: https://github.com/apache/kafka/pull/10711#issuecomment-842993232 cc @ijuma Could you please review this PR, as you were one the last ones to do a similar change? Thanks in advance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12805) Aborted send could have a different exception than DisconnectException
[ https://issues.apache.org/jira/browse/KAFKA-12805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicolas Guyomar updated KAFKA-12805: Description: Right now we are treating timeout in the network client as a disconnection exception, which "hides" legit timeout where increasing {{[request.timeout.ms|http://request.timeout.ms/]}} could be considered OK when there is no "real" network disconnection : Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeConfigs, deadlineMs=1616147081029) timed out at 1616147081039 after 2 attempt(s) Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled describeConfigs request with correlation id 8 due to node 1 being disconnected the DisconnectException is thrown because of the disconnect flag being set to true in [https://github.com/apache/kafka/blob/3d0b4d910b681df7d873c8a0285eaca01d6c173a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L352] While we _could_ have a different path from there [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L793] that would propagate the fact that the connection timed out because of {{[request.timeout.ms|http://request.timeout.ms/]}} expiration, and adjust the later thrown exception in there [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L1195] so that it's not a {{DisconnectException}} ? Thank you was: Tight now we are treating timeout in the network client as a disconnection exception, which "hides" legit timeout where increasing {{[request.timeout.ms|http://request.timeout.ms/]}} could be considered OK when there is no "real" network disconnection : Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeConfigs, deadlineMs=1616147081029) timed out at 1616147081039 after 2 attempt(s) Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled describeConfigs request with correlation id 8 due to node 1 being disconnected the DisconnectException is thrown because of the disconnect flag being set to true in [https://github.com/apache/kafka/blob/3d0b4d910b681df7d873c8a0285eaca01d6c173a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L352] While we _could_ have a different path from there [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L793] that would propagate the fact that the connection timed out because of {{[request.timeout.ms|http://request.timeout.ms/]}} expiration, and adjust the later thrown exception in there [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L1195] so that it's not a {{DisconnectException}} ? Thank you > Aborted send could have a different exception than DisconnectException > -- > > Key: KAFKA-12805 > URL: https://issues.apache.org/jira/browse/KAFKA-12805 > Project: Kafka > Issue Type: Wish > Components: network >Reporter: Nicolas Guyomar >Priority: Minor > > Right now we are treating timeout in the network client as a disconnection > exception, which "hides" legit timeout where increasing > {{[request.timeout.ms|http://request.timeout.ms/]}} could be considered OK > when there is no "real" network disconnection : > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=describeConfigs, deadlineMs=1616147081029) timed out at > 1616147081039 after 2 attempt(s) > Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled > describeConfigs request with correlation id 8 due to node 1 being disconnected > > the DisconnectException is thrown because of the disconnect flag being set to > true in > [https://github.com/apache/kafka/blob/3d0b4d910b681df7d873c8a0285eaca01d6c173a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L352] > While we _could_ have a different path from there > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L793] > that would propagate the fact that the connection timed out because of > {{[request.timeout.ms|http://request.timeout.ms/]}} expiration, and adjust > the later thrown exception in there > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L1195] > so that it's not a {{DisconnectException}} ? > > Thank you > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12751) ISRs remain in in-flight state if proposed state is same as actual state
[ https://issues.apache.org/jira/browse/KAFKA-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-12751. Reviewer: David Arthur Resolution: Fixed > ISRs remain in in-flight state if proposed state is same as actual state > > > Key: KAFKA-12751 > URL: https://issues.apache.org/jira/browse/KAFKA-12751 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.7.0, 2.8.0, 2.7.1 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 2.7.2, 2.8.1 > > > If proposed ISR state in an AlterIsr request is the same as the actual state, > Controller returns a successful response without performing any updates. But > the broker code that processes the response leaves the ISR state in in-flight > state without committing. This prevents further ISR updates until the next > leader election. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12805) Aborted send could have a different exception than DisconnectException
Nicolas Guyomar created KAFKA-12805: --- Summary: Aborted send could have a different exception than DisconnectException Key: KAFKA-12805 URL: https://issues.apache.org/jira/browse/KAFKA-12805 Project: Kafka Issue Type: Wish Components: network Reporter: Nicolas Guyomar Tight now we are treating timeout in the network client as a disconnection exception, which "hides" legit timeout where increasing {{[request.timeout.ms|http://request.timeout.ms/]}} could be considered OK when there is no "real" network disconnection : Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeConfigs, deadlineMs=1616147081029) timed out at 1616147081039 after 2 attempt(s) Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled describeConfigs request with correlation id 8 due to node 1 being disconnected the DisconnectException is thrown because of the disconnect flag being set to true in [https://github.com/apache/kafka/blob/3d0b4d910b681df7d873c8a0285eaca01d6c173a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L352] While we _could_ have a different path from there [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L793] that would propagate the fact that the connection timed out because of {{[request.timeout.ms|http://request.timeout.ms/]}} expiration, and adjust the later thrown exception in there [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L1195] so that it's not a {{DisconnectException}} ? Thank you -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on pull request #10552: URL: https://github.com/apache/kafka/pull/10552#issuecomment-842947967 Broken tests are fixed and new tests are added for multiple generation tests for unequal subscription cases. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi opened a new pull request #10716: optimize offset reset strategy and fix lose data when add partition
hudeqi opened a new pull request #10716: URL: https://github.com/apache/kafka/pull/10716 1. besides `latest` and `earliest`, we also add `nearest`: reset to either latest or earliest depending on the current offset (i.e. this policy won't trigger under the scenario when we see a partition for the first time, without committed offsets; it will only trigger for out-of-range). 2. `latest-on-start`, `earliest-on-start`: reset to either latest or earliest only when we see the partition for the first time without committed offset; when out-of-range default to `none`, i.e. throw exception. 3. an additional `limitTimeStamp` limit used for `latest/earliest/latest-on-start/earliest-on-start`: it means we only reset to latest / earliest if its partition's first record timestamp is smaller / larger than the given `limitTimeStamp` parameter, otherwise, reset to earliest / latest. set the `limitTimeStamp` value to the consumer group started timestamp, when new partitions are added it would reset to `earliest` to avoid losing data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12796) Removal of deprecated classes under `streams-scala`
[ https://issues.apache.org/jira/browse/KAFKA-12796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12796: Labels: scala (was: ) > Removal of deprecated classes under `streams-scala` > --- > > Key: KAFKA-12796 > URL: https://issues.apache.org/jira/browse/KAFKA-12796 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Josep Prat >Assignee: Josep Prat >Priority: Major > Labels: scala > > There are 3 different classes that are deprecated under the streams-scala > submodule: > * > streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala > * > streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala > * > -streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala- > As far as I can tell, none of them are in use internally and could be removed > for release 3.0.0 > > Does this change require a KIP? > Update, only classes/methods deprecaded since at least 2.5 can be removed. > This means `Serdes` needs to stay till 4.0. I'll create tickets for > methods/classes that are deprecated, but not long enough. -- This message was sent by Atlassian Jira (v8.3.4#803005)