[GitHub] [kafka] yashmayya commented on pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic
yashmayya commented on PR #12947: URL: https://github.com/apache/kafka/pull/12947#issuecomment-1336336173 Hi @C0urante, would you be able to take a look at this whenever possible? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #12949: MINOR: Remove unused `ApiUtils`
ijuma merged PR #12949: URL: https://github.com/apache/kafka/pull/12949 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12948: MINOR: Add JDK 19 & Scala 2.13 CI build
ijuma commented on PR #12948: URL: https://github.com/apache/kafka/pull/12948#issuecomment-1336297376 #12675 needs to be merged for this to work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12675: KAFKA-14256: Upgrade from Scala 2.13.8 to 2.13.10
ijuma commented on code in PR #12675: URL: https://github.com/apache/kafka/pull/12675#discussion_r1038894058 ## build.gradle: ## @@ -647,6 +647,9 @@ subprojects { "-Xlint:unused" ] +if (versions.baseScala == '2.13') + scalaCompileOptions.additionalParameters += ["-Wconf:msg=@nowarn annotation does not suppress any warnings:s"] // See https://github.com/scala/scala/pull/9960 Review Comment: Why do we have to do this instead of removing the suppressions that are no longer needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request, #12950: MINOR: Remove KafkaTimer
ijuma opened a new pull request, #12950: URL: https://github.com/apache/kafka/pull/12950 It doesn't add much value since lambdas were introduced in Java 8. Also remove KafkaTimerTest. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request, #12949: MINOR: Remove unused `ApiUtils`
ijuma opened a new pull request, #12949: URL: https://github.com/apache/kafka/pull/12949 Also remove `ApiUtilsTest`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments
[ https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14437: - Description: Currently, in StripedReplicaPlacer we don’t take existing partition assignments into consideration when the place method is called. This means for new partitions added, they may get the same assignments as existing partitions. This differs from AdminUtils, which has some logic to try and shift where in the list of brokers we start making assignments from for new partitions added. For example, lets say we had the following {code:java} Rack 1: 0, 1, 2, 3 Rack 2: 4, 5, 6, 7 Rack 3: 8, 9, 10, 11 {code} CreateTopics might return the following assignment for two partitions: {code:java} P0: 6, 8, 2 P1: 9, 3, 7 {code} If the user then calls CreatePartitions increasing the partition count to 4, StripedReplicaPlacer does not take into account P0 and P1. It creates a random rack offset and a random broker offset. So it could easily create the same assignment for P3 and P4 that it created for P0 and P1. This is easily reproduced in a unit test. My suggestion is to enhance StripedReplicaPlacer to account for existing partition assignments. Intuitively, we’d like to make assignments for added partitions from “where we left off” when we were making the previous assignments. In practice, its not possible to know exactly what the state was during the previous partition assignments because, for example, brokers fencing state may have changed. But I do think we can make a best effort attempt to do so that is optimized for the common case where most brokers were and are unfenced. Note, all the changes suggested below only affect StripedReplicaPlacer when place is called and there are existing partition assignments, which happens when its servicing CreatePartitions requests. If there are no existing partition assignments, which happens during CreateTopics, the logic is unchanged. First, we need to update ClusterDescriber to: {code:java} public interface ClusterDescriber { /** * Get an iterator through the usable brokers. */ Iterator usableBrokers(); List> replicasForTopicName(String topicName); } {code} The replicasForTopicName returns the existing partition assignments. This will enable StripedReplicaPlacer to know about existing partition assignments if they exist. When place is called, some initialization is done in both RackList and BrokerList. One thing that is initialized is the offset variable - this is a variable used in both RackList and BrokerList that determines where in the list of either racks or brokers respectively we should start from when making the next assignment. Currently, it is initialized to a random value, based off the size of the list. I suggest we add some logic during initialization that sets the offset for both RackList and BrokerList to a value based off the previous assignments. Consider again the following rack metadata and existing assignments: {code:java} Rack 1: 0, 1, 2, 3 Rack 2: 4, 5, 6, 7 Rack 3: 8, 9, 10, 11 P0: 6, 8, 2 P1: 9, 3, 7 {code} Lets imagine a user wants to create a new partition, P3. First, we need to determine which rack to start from for P3: this corresponds to the initial offset in RackList. We can look at the leader of P1 (not P0 because P1 is the “last” partition we made an assignment for) and see its on rack 3. So, the next rack we should start from should be rack 1. This means we set offset in RackList to 0, instead of a random value, during initialization. Second, we need to determine which broker to start from {_}per rack{_}: this corresponds to the initial offset in BrokerList. We can look at all the existing partition assignments, P0 and P1 in our example, and _per rack_ infer the last offset started from during previous assignments. For each rack, we do this by iterating through each partition, in reverse order because we care about the most recent starting position, and try to find the first broker in the assignment from the rack. This enables us to know where we last started from when making an assignment for that rack, which can be used to determine where to continue on from within that rack. So in our example, for rack 1 we can see the last broker we started from was broker 3 in P1: so the next broker we should choose for that rack should be 0 which means the initial offset is set to 0 in the BrokerList for rack 1 during initialization. For rack 2 we can see the last broker we started with was broker 7 in P1: so the next broker should be 4 which means the offset is 0 in the BrokerList for rack 2. For rack 3 we can see the last broker we started with was broker 9 in P1: so the next broker should be 10 which means the offset is 2 in the BrokerList for rack 3. was: Currently, in StripedReplicaPlacer we don’t take existing partition assignments into consideration
[GitHub] [kafka] ijuma opened a new pull request, #12948: MINOR: Add JDK 19 & Scala 2.13 CI build
ijuma opened a new pull request, #12948: URL: https://github.com/apache/kafka/pull/12948 It's good for us to track the latest JDK version supported by Gradle. Given that Scala 2.12 support has been deprecated, I did not include a Scala 2.12 variant. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #12946: KAFKA-14427 ZK client support for migrations
mumrah commented on PR #12946: URL: https://github.com/apache/kafka/pull/12946#issuecomment-1336208057 @dengziming -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya opened a new pull request, #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic
yashmayya opened a new pull request, #12947: URL: https://github.com/apache/kafka/pull/12947 - From the [JIRA ticket](https://issues.apache.org/jira/browse/KAFKA-6586): > The main methods in ConnectDistributed and ConnectStandalone have a lot of duplication, and it'd be good to refactor to centralize the logic. We can pull most of this logic into an abstract class that ConnectStandalone and ConnectDistributed both extend. At a glance, the differences between the two are different config and Herder implementations and some different initialization logic. - This refactor also allows for a straightforward implementation of https://issues.apache.org/jira/browse/KAFKA-3815 if that were to be pursued. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-6586) Refactor Connect executables
[ https://issues.apache.org/jira/browse/KAFKA-6586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-6586: - Assignee: Yash Mayya > Refactor Connect executables > > > Key: KAFKA-6586 > URL: https://issues.apache.org/jira/browse/KAFKA-6586 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Randall Hauch >Assignee: Yash Mayya >Priority: Minor > > The main methods in {{ConnectDistributed}} and {{ConnectStandalone}} have a > lot of duplication, and it'd be good to refactor to centralize the logic. We > can pull most of this logic into an abstract class that {{ConnectStandalone}} > and {{ConnectDistributed}} both extend. At a glance, the differences between > the two are different config and Herder implementations and some different > initialization logic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14440) Local state wipeout with EOS
[ https://issues.apache.org/jira/browse/KAFKA-14440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abdullah alkhawatrah updated KAFKA-14440: - Description: Hey, I have a kafka streams service that aggregates events from multiple input topics (running in a k8s cluster). The topology has multiple FKJs. The input topics have around 7 billion events when the service was started from `earliest`. The service has EOS enabled and {code:java} transaction.timeout.ms: 60{code} The problem I am having is with frequent local state wipe-outs, this leads to very long rebalances. As can be seen from the attached images, local disk sizes go to ~ 0 very often. These wipe out are part of the EOS guarantee based on this log message: {code:java} State store transfer-store did not find checkpoint offsets while stores are not empty, since under EOS it has the risk of getting uncommitted data in stores we have to treat it as a task corruption error and wipe out the local state of task 1_8 before re-bootstrapping{code} I noticed that this happens as a result of one of the following: * Process gets sigkill when running out of memory or on failure to shutdown gracefully on pod rotation for example, this explains the missing local checkpoint file, but for some reason I thought local checkpoint updates are frequent, so I expected to get part of the state to be reset but not the whole local state. * Although we have a long transaction timeout config, this appears many times in the logs, after which kafka streams gets into error state. On startup, local checkpoint file is not found: {code:java} Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.{code} The service has 10 instances all having the same behaviour. The issue disappears when EOS is disabled. The kafka cluster runs kafka 2.6, with minimum isr of 3. was: Hey, I have a kafka streams service that aggregates events from multiple input topics (running in a k8s cluster). The topology has multiple FKJs. The input topics have around 7 billion events when the service was started from `earliest`. The service has EOS enabled and `[transaction.timeout.ms|http://transaction.timeout.ms/]` is `60`. The problem I am having is with frequent local state wipe-outs, this leads to very long rebalances. As can be seen from the attached images, local disk sizes go to ~ 0 very often. These wipe out are part of the EOS guarantee based on this log message: `State store transfer-store did not find checkpoint offsets while stores are not empty, since under EOS it has the risk of getting uncommitted data in stores we have to treat it as a task corruption error and wipe out the local state of task 1_8 before re-bootstrapping` I noticed that this happens as a result of one of the following: * Process gets sigkill when running out of memory or on failure to shutdown gracefully on pod rotation for example, this explains the missing local checkpoint file, but for some reason I thought local checkpoint updates are frequent, so I expected to get part of the state to be reset but not the whole local state. * Although we have a long transaction timeout config, this appears many times in the logs, after which kafka streams gets into error state. On startup, local checkpoint file is not found: `Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.` The service has 10 instances all having the same behaviour. The issue disappears when EOS is disabled. The kafka cluster runs kafka 2.6, with minimum isr of 3. > Local state wipeout with EOS > > > Key: KAFKA-14440 > URL: https://issues.apache.org/jira/browse/KAFKA-14440 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.3 >Reporter: Abdullah alkhawatrah >Priority: Major > Attachments: Screenshot 2022-12-02 at 09.26.27.png > > > Hey, > I have a kafka streams service that aggregates events from multiple input > topics (running in a k8s cluster). The topology has multiple FKJs. The input > topics have around 7 billion events when the service was started from > `earliest`. > The service has EOS enabled and > {code:java} > transaction.timeout.ms: 60{code} > The problem I am having is with frequent local state wipe-outs, this leads to > very long rebalances. As can be seen from the attached images, local disk > sizes go to ~ 0 very often. These wipe out are part of the EOS guarantee > based on this log message: > {code:java} > State store transfer-store did not find checkpoint offsets while stores are > not empty, since under EOS it has the risk of
[jira] [Updated] (KAFKA-14440) Local state wipeout with EOS
[ https://issues.apache.org/jira/browse/KAFKA-14440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abdullah alkhawatrah updated KAFKA-14440: - Description: Hey, I have a kafka streams service that aggregates events from multiple input topics (running in a k8s cluster). The topology has multiple FKJs. The input topics have around 7 billion events when the service was started from `earliest`. The service has EOS enabled and `[transaction.timeout.ms|http://transaction.timeout.ms/]` is `60`. The problem I am having is with frequent local state wipe-outs, this leads to very long rebalances. As can be seen from the attached images, local disk sizes go to ~ 0 very often. These wipe out are part of the EOS guarantee based on this log message: `State store transfer-store did not find checkpoint offsets while stores are not empty, since under EOS it has the risk of getting uncommitted data in stores we have to treat it as a task corruption error and wipe out the local state of task 1_8 before re-bootstrapping` I noticed that this happens as a result of one of the following: * Process gets sigkill when running out of memory or on failure to shutdown gracefully on pod rotation for example, this explains the missing local checkpoint file, but for some reason I thought local checkpoint updates are frequent, so I expected to get part of the state to be reset but not the whole local state. * Although we have a long transaction timeout config, this appears many times in the logs, after which kafka streams gets into error state. On startup, local checkpoint file is not found: `Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.` The service has 10 instances all having the same behaviour. The issue disappears when EOS is disabled. The kafka cluster runs kafka 2.6, with minimum isr of 3. was: Hey, I have a kafka streams service that aggregates events from multiple input topics (running in a k8s cluster). The topology has multiple FKJs. The input topics have around 7 billion events when the service was started from `earliest`. The service has EOS enabled and `[transaction.timeout.ms|http://transaction.timeout.ms/]` is `60`. The problem I am having is with frequent local state wipe-outs, this leads to very long rebalances. As can be seen from the attached images, local disk sizes go to ~ 0 very often. These wipe out are part of the EOS guarantee based on this log message: State store transfer-store did not find checkpoint offsets while stores are not empty, since under EOS it has the risk of getting uncommitted data in stores we have to treat it as a task corruption error and wipe out the local state of task 1_8 before re-bootstrapping I noticed that this happens as a result of one of the following: * Process gets sigkill when running out of memory or on failure to shutdown gracefully on pod rotation for example, this explains the missing local checkpoint file, but for some reason I thought local checkpoint updates are frequent, so I expected to get part of the state to be reset but not the whole local state. * Although we have a long transaction timeout config, this appears many times in the logs, after which kafka streams gets into error state. On startup, local checkpoint file is not found: Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch. The service has 10 instances all having the same behaviour. The issue disappears when EOS is disabled. The kafka cluster runs kafka 2.6, with minimum isr of 3. > Local state wipeout with EOS > > > Key: KAFKA-14440 > URL: https://issues.apache.org/jira/browse/KAFKA-14440 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.3 >Reporter: Abdullah alkhawatrah >Priority: Major > Attachments: Screenshot 2022-12-02 at 09.26.27.png > > > Hey, > I have a kafka streams service that aggregates events from multiple input > topics (running in a k8s cluster). The topology has multiple FKJs. The input > topics have around 7 billion events when the service was started from > `earliest`. > The service has EOS enabled and > `[transaction.timeout.ms|http://transaction.timeout.ms/]` is `60`. > The problem I am having is with frequent local state wipe-outs, this leads to > very long rebalances. As can be seen from the attached images, local disk > sizes go to ~ 0 very often. These wipe out are part of the EOS guarantee > based on this log message: > `State store transfer-store did not find checkpoint offsets while stores are > not empty, since under EOS it has the risk of
[jira] [Created] (KAFKA-14440) Local state wipeout with EOS
Abdullah alkhawatrah created KAFKA-14440: Summary: Local state wipeout with EOS Key: KAFKA-14440 URL: https://issues.apache.org/jira/browse/KAFKA-14440 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.2.3 Reporter: Abdullah alkhawatrah Attachments: Screenshot 2022-12-02 at 09.26.27.png Hey, I have a kafka streams service that aggregates events from multiple input topics (running in a k8s cluster). The topology has multiple FKJs. The input topics have around 7 billion events when the service was started from `earliest`. The service has EOS enabled and `[transaction.timeout.ms|http://transaction.timeout.ms/]` is `60`. The problem I am having is with frequent local state wipe-outs, this leads to very long rebalances. As can be seen from the attached images, local disk sizes go to ~ 0 very often. These wipe out are part of the EOS guarantee based on this log message: State store transfer-store did not find checkpoint offsets while stores are not empty, since under EOS it has the risk of getting uncommitted data in stores we have to treat it as a task corruption error and wipe out the local state of task 1_8 before re-bootstrapping I noticed that this happens as a result of one of the following: * Process gets sigkill when running out of memory or on failure to shutdown gracefully on pod rotation for example, this explains the missing local checkpoint file, but for some reason I thought local checkpoint updates are frequent, so I expected to get part of the state to be reset but not the whole local state. * Although we have a long transaction timeout config, this appears many times in the logs, after which kafka streams gets into error state. On startup, local checkpoint file is not found: Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch. The service has 10 instances all having the same behaviour. The issue disappears when EOS is disabled. The kafka cluster runs kafka 2.6, with minimum isr of 3. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya commented on a diff in pull request #12800: KAFKA-14342: KafkaOffsetBackingStore should clear offsets for source partitions on tombstone messages
yashmayya commented on code in PR #12800: URL: https://github.com/apache/kafka/pull/12800#discussion_r1038756498 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java: ## @@ -329,6 +329,60 @@ public void testGetSetNull() throws Exception { PowerMock.verifyAll(); } +@Test +public void testTombstoneOffset() throws Exception { +expectConfigure(); +expectStart(Collections.singletonList(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(), +new RecordHeaders(), Optional.empty(; + +Capture producerCallback = EasyMock.newCapture(); +storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(producerCallback)); +PowerMock.expectLastCall(); + +final Capture> readToEndCallback = EasyMock.newCapture(); +storeLog.readToEnd(EasyMock.capture(readToEndCallback)); +PowerMock.expectLastCall().andAnswer(() -> { +capturedConsumedCallback.getValue().onCompletion(null, +new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), null, +new RecordHeaders(), Optional.empty())); +readToEndCallback.getValue().onCompletion(null, null); +return null; +}); + +expectStop(); +expectClusterId(); + +PowerMock.replayAll(); + +store.configure(DEFAULT_DISTRIBUTED_CONFIG); +store.start(); + +// Write tombstone offset +Map toSet = new HashMap<>(); +toSet.put(TP0_KEY, null); + +final AtomicBoolean invoked = new AtomicBoolean(false); +Future setFuture = store.set(toSet, (error, result) -> invoked.set(true)); +assertFalse(setFuture.isDone()); +producerCallback.getValue().onCompletion(null, null); +setFuture.get(1, TimeUnit.MILLISECONDS); +assertTrue(invoked.get()); + +// Getting data should read to end of our published data and return it +Map offsets = store.get(Collections.singletonList(TP0_KEY)).get(1, TimeUnit.MILLISECONDS); +assertNull(offsets.get(TP0_KEY)); + +// Just verifying that KafkaOffsetBackingStore::get returns null isn't enough, we also need to verify that the mapping for the source partition key is removed. +// This is because KafkaOffsetBackingStore::get returns null if either there is no existing offset for the source partition or if there is an offset with null value. +// We need to make sure that tombstoned offsets are removed completely (i.e. that the mapping for the corresponding source partition is removed). +HashMap data = Whitebox.getInternalState(store, "data"); +assertFalse(data.containsKey(TP0_KEY)); Review Comment: I've done a rebase and re-written the test. IMO just adding the `store.data.containsKey` check to the end of `testGetSetNull` wouldn't test the case that we're actually trying to cover with this change - i.e. existing offsets are cleared from the in-memory map on tombstone offsets. `testGetSetNull` is just checking if null keys and null values can be set without issues, and instead of rewriting it to first write a non-null value and then attempt to clear it with a tombstone I felt like a separate test case would be better. ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java: ## @@ -325,11 +325,12 @@ public Future set(final Map values, final Callback return producerCallback; } -protected final Callback> consumedCallback = new Callback>() { -@Override -public void onCompletion(Throwable error, ConsumerRecord record) { -ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null; -ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null; +protected final Callback> consumedCallback = (error, record) -> { +ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null; Review Comment: While the suggestion makes sense and I've added it, it looks like the `error` will never be non-null as of now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14439) Specify returned errors for various APIs and versions
[ https://issues.apache.org/jira/browse/KAFKA-14439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642800#comment-17642800 ] Tom Bentley commented on KAFKA-14439: - FWIW I spent some time on this a few years ago, see [KAFKA-7787|https://issues.apache.org/jira/browse/KAFKA-7787]. > Specify returned errors for various APIs and versions > - > > Key: KAFKA-14439 > URL: https://issues.apache.org/jira/browse/KAFKA-14439 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Priority: Major > > Kafka is known for supporting various clients and being compatible across > different versions. But one thing that is a bit unclear is what errors each > response can send. > Knowing what errors can come from each version helps those who implement > clients have a more defined spec for what errors they need to handle. When > new errors are added, it is clearer to the clients that changes need to be > made. > It also helps contributors get a better understanding about how clients are > expected to react and potentially find and prevent gaps like the one found in > https://issues.apache.org/jira/browse/KAFKA-14417 > I briefly synced offline with [~hachikuji] about this and he suggested maybe > adding values for the error codes in the schema definitions of APIs that > specify the error codes and what versions they are returned on. One idea was > creating some enum type to accomplish this. -- This message was sent by Atlassian Jira (v8.20.10#820010)