[GitHub] [kafka] yashmayya commented on pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

2022-12-03 Thread GitBox


yashmayya commented on PR #12947:
URL: https://github.com/apache/kafka/pull/12947#issuecomment-1336336173

   Hi @C0urante, would you be able to take a look at this whenever possible?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #12949: MINOR: Remove unused `ApiUtils`

2022-12-03 Thread GitBox


ijuma merged PR #12949:
URL: https://github.com/apache/kafka/pull/12949


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12948: MINOR: Add JDK 19 & Scala 2.13 CI build

2022-12-03 Thread GitBox


ijuma commented on PR #12948:
URL: https://github.com/apache/kafka/pull/12948#issuecomment-1336297376

   #12675 needs to be merged for this to work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12675: KAFKA-14256: Upgrade from Scala 2.13.8 to 2.13.10

2022-12-03 Thread GitBox


ijuma commented on code in PR #12675:
URL: https://github.com/apache/kafka/pull/12675#discussion_r1038894058


##
build.gradle:
##
@@ -647,6 +647,9 @@ subprojects {
   "-Xlint:unused"
 ]
 
+if (versions.baseScala == '2.13')
+  scalaCompileOptions.additionalParameters += ["-Wconf:msg=@nowarn 
annotation does not suppress any warnings:s"] // See 
https://github.com/scala/scala/pull/9960

Review Comment:
   Why do we have to do this instead of removing the suppressions that are no 
longer needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma opened a new pull request, #12950: MINOR: Remove KafkaTimer

2022-12-03 Thread GitBox


ijuma opened a new pull request, #12950:
URL: https://github.com/apache/kafka/pull/12950

   It doesn't add much value since lambdas
   were introduced in Java 8.
   
   Also remove KafkaTimerTest.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma opened a new pull request, #12949: MINOR: Remove unused `ApiUtils`

2022-12-03 Thread GitBox


ijuma opened a new pull request, #12949:
URL: https://github.com/apache/kafka/pull/12949

   Also remove `ApiUtilsTest`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments

2022-12-03 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14437:
-
Description: 
Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration when the place method is called. This means for new 
partitions added, they may get the same assignments as existing partitions. 
This differs from AdminUtils, which has some logic to try and shift where in 
the list of brokers we start making assignments from for new partitions added.

For example, lets say we had the following
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
{code}
CreateTopics might return the following assignment for two partitions:
{code:java}
P0: 6, 8, 2
P1: 9, 3, 7
{code}
If the user then calls CreatePartitions increasing the partition count to 4, 
StripedReplicaPlacer does not take into account P0 and P1. It creates a random 
rack offset and a random broker offset. So it could easily create the same 
assignment for P3 and P4 that it created for P0 and P1. This is easily 
reproduced in a unit test.

My suggestion is to enhance StripedReplicaPlacer to account for existing 
partition assignments. Intuitively, we’d like to make assignments for added 
partitions from “where we left off” when we were making the previous 
assignments. In practice, its not possible to know exactly what the state was 
during the previous partition assignments because, for example, brokers fencing 
state may have changed. But I do think we can make a best effort attempt to do 
so that is optimized for the common case where most brokers were and are 
unfenced.

Note, all the changes suggested below only affect StripedReplicaPlacer when 
place is called and there are existing partition assignments, which happens 
when its servicing CreatePartitions requests. If there are no existing 
partition assignments, which happens during CreateTopics, the logic is 
unchanged.

First, we need to update ClusterDescriber to:
{code:java}
public interface ClusterDescriber {
    /**
     * Get an iterator through the usable brokers.
     */
    Iterator usableBrokers();

    List> replicasForTopicName(String topicName);
}
{code}
The replicasForTopicName returns the existing partition assignments. This will 
enable StripedReplicaPlacer to know about existing partition assignments if 
they exist.

When place is called, some initialization is done in both RackList and 
BrokerList. One thing that is initialized is the offset variable - this is a 
variable used in both RackList and BrokerList that determines where in the list 
of either racks or brokers respectively we should start from when making the 
next assignment. Currently, it is initialized to a random value, based off the 
size of the list. 

I suggest we add some logic during initialization that sets the offset for both 
RackList and BrokerList to a value based off the previous assignments.

Consider again the following rack metadata and existing assignments:
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
 
P0: 6, 8, 2
P1: 9, 3, 7  
{code}
Lets imagine a user wants to create a new partition, P3. 

First, we need to determine which rack to start from for P3: this corresponds 
to the initial offset in RackList. We can look at the leader of P1 (not P0 
because P1 is the “last” partition we made an assignment for) and see its on 
rack 3. So, the next rack we should start from should be rack 1. This means we 
set offset in RackList to 0, instead of a random value, during initialization. 

Second, we need to determine which broker to start from {_}per rack{_}: this 
corresponds to the initial offset in BrokerList. We can look at all the 
existing partition assignments, P0 and P1 in our example, and _per rack_ infer 
the last offset started from during previous assignments. For each rack, we do 
this by iterating through each partition, in reverse order because we care 
about the most recent starting position, and try to find the first broker in 
the assignment from the rack. This enables us to know where we last started 
from when making an assignment for that rack, which can be used to determine 
where to continue on from within that rack.

So in our example, for rack 1 we can see the last broker we started from was 
broker 3 in P1: so the next broker we should choose for that rack should be 0 
which means the initial offset is set to 0 in the BrokerList for rack 1 during 
initialization. For rack 2 we can see the last broker we started with was 
broker 7 in P1: so the next broker should be 4 which means the offset is 0 in 
the BrokerList for rack 2. For rack 3 we can see the last broker we started 
with was broker 9 in P1: so the next broker should be 10 which means the offset 
is 2 in the BrokerList for rack 3.

  was:
Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration 

[GitHub] [kafka] ijuma opened a new pull request, #12948: MINOR: Add JDK 19 & Scala 2.13 CI build

2022-12-03 Thread GitBox


ijuma opened a new pull request, #12948:
URL: https://github.com/apache/kafka/pull/12948

   It's good for us to track the latest JDK version supported
   by Gradle.
   
   Given that Scala 2.12 support has been deprecated,
   I did not include a Scala 2.12 variant.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #12946: KAFKA-14427 ZK client support for migrations

2022-12-03 Thread GitBox


mumrah commented on PR #12946:
URL: https://github.com/apache/kafka/pull/12946#issuecomment-1336208057

   @dengziming 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya opened a new pull request, #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

2022-12-03 Thread GitBox


yashmayya opened a new pull request, #12947:
URL: https://github.com/apache/kafka/pull/12947

   - From the [JIRA ticket](https://issues.apache.org/jira/browse/KAFKA-6586):
   > The main methods in ConnectDistributed and ConnectStandalone have a lot of 
duplication, and it'd be good to refactor to centralize the logic. We can pull 
most of this logic into an abstract class that ConnectStandalone and 
ConnectDistributed both extend. At a glance, the differences between the two 
are different config and Herder implementations and some different 
initialization logic.
   
   - This refactor also allows for a straightforward implementation of 
https://issues.apache.org/jira/browse/KAFKA-3815 if that were to be pursued.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-6586) Refactor Connect executables

2022-12-03 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-6586:
-

Assignee: Yash Mayya

> Refactor Connect executables
> 
>
> Key: KAFKA-6586
> URL: https://issues.apache.org/jira/browse/KAFKA-6586
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Randall Hauch
>Assignee: Yash Mayya
>Priority: Minor
>
> The main methods in {{ConnectDistributed}} and {{ConnectStandalone}} have a 
> lot of duplication, and it'd be good to refactor to centralize the logic. We 
> can pull most of this logic into an abstract class that {{ConnectStandalone}} 
> and {{ConnectDistributed}} both extend. At a glance, the differences between 
> the two are different config and Herder implementations and some different 
> initialization logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14440) Local state wipeout with EOS

2022-12-03 Thread Abdullah alkhawatrah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdullah alkhawatrah updated KAFKA-14440:
-
Description: 
Hey,

I have a kafka streams service that aggregates events from multiple input 
topics (running in a k8s cluster). The topology has multiple FKJs. The input 
topics have around 7 billion events when the service was started from 
`earliest`.

The service has EOS enabled and 
{code:java}
transaction.timeout.ms: 60{code}
The problem I am having is with frequent local state wipe-outs, this leads to 
very long rebalances. As can be seen from the attached images, local disk sizes 
go to ~ 0 very often. These wipe out are part of the EOS guarantee based on 
this log message: 
{code:java}
State store transfer-store did not find checkpoint offsets while stores are not 
empty, since under EOS it has the risk of getting uncommitted data in stores we 
have to treat it as a task corruption error and wipe out the local state of 
task 1_8 before re-bootstrapping{code}
 

I noticed that this happens as a result of one of the following:
 * Process gets sigkill when running out of memory or on failure to shutdown 
gracefully on pod rotation for example, this explains the missing local 
checkpoint file, but for some reason I thought local checkpoint updates are 
frequent, so I expected to get part of the state to be reset but not the whole 
local state.
 * Although we have a  long transaction timeout config, this appears many times 
in the logs, after which kafka streams gets into error state. On startup, local 
checkpoint file is not found:

{code:java}
Transiting to abortable error state due to 
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.{code}
The service has 10 instances all having the same behaviour. The issue 
disappears when EOS is disabled.

The kafka cluster runs kafka 2.6, with minimum isr of 3.

 

 

  was:
Hey,

I have a kafka streams service that aggregates events from multiple input 
topics (running in a k8s cluster). The topology has multiple FKJs. The input 
topics have around 7 billion events when the service was started from 
`earliest`.

The service has EOS enabled and 
`[transaction.timeout.ms|http://transaction.timeout.ms/]` is `60`. 

The problem I am having is with frequent local state wipe-outs, this leads to 
very long rebalances. As can be seen from the attached images, local disk sizes 
go to ~ 0 very often. These wipe out are part of the EOS guarantee based on 
this log message: 
`State store transfer-store did not find checkpoint offsets while stores are 
not empty, since under EOS it has the risk of getting uncommitted data in 
stores we have to treat it as a task corruption error and wipe out the local 
state of task 1_8 before re-bootstrapping`
 

I noticed that this happens as a result of one of the following:
 * Process gets sigkill when running out of memory or on failure to shutdown 
gracefully on pod rotation for example, this explains the missing local 
checkpoint file, but for some reason I thought local checkpoint updates are 
frequent, so I expected to get part of the state to be reset but not the whole 
local state.
 * Although we have a  long transaction timeout config, this appears many times 
in the logs, after which kafka streams gets into error state. On startup, local 
checkpoint file is not found:

`Transiting to abortable error state due to 
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.`
 

The service has 10 instances all having the same behaviour. The issue 
disappears when EOS is disabled.

The kafka cluster runs kafka 2.6, with minimum isr of 3.

 

 


> Local state wipeout with EOS
> 
>
> Key: KAFKA-14440
> URL: https://issues.apache.org/jira/browse/KAFKA-14440
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.3
>Reporter: Abdullah alkhawatrah
>Priority: Major
> Attachments: Screenshot 2022-12-02 at 09.26.27.png
>
>
> Hey,
> I have a kafka streams service that aggregates events from multiple input 
> topics (running in a k8s cluster). The topology has multiple FKJs. The input 
> topics have around 7 billion events when the service was started from 
> `earliest`.
> The service has EOS enabled and 
> {code:java}
> transaction.timeout.ms: 60{code}
> The problem I am having is with frequent local state wipe-outs, this leads to 
> very long rebalances. As can be seen from the attached images, local disk 
> sizes go to ~ 0 very often. These wipe out are part of the EOS guarantee 
> based on this log message: 
> {code:java}
> State store transfer-store did not find checkpoint offsets while stores are 
> not empty, since under EOS it has the risk of 

[jira] [Updated] (KAFKA-14440) Local state wipeout with EOS

2022-12-03 Thread Abdullah alkhawatrah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdullah alkhawatrah updated KAFKA-14440:
-
Description: 
Hey,

I have a kafka streams service that aggregates events from multiple input 
topics (running in a k8s cluster). The topology has multiple FKJs. The input 
topics have around 7 billion events when the service was started from 
`earliest`.

The service has EOS enabled and 
`[transaction.timeout.ms|http://transaction.timeout.ms/]` is `60`. 

The problem I am having is with frequent local state wipe-outs, this leads to 
very long rebalances. As can be seen from the attached images, local disk sizes 
go to ~ 0 very often. These wipe out are part of the EOS guarantee based on 
this log message: 
`State store transfer-store did not find checkpoint offsets while stores are 
not empty, since under EOS it has the risk of getting uncommitted data in 
stores we have to treat it as a task corruption error and wipe out the local 
state of task 1_8 before re-bootstrapping`
 

I noticed that this happens as a result of one of the following:
 * Process gets sigkill when running out of memory or on failure to shutdown 
gracefully on pod rotation for example, this explains the missing local 
checkpoint file, but for some reason I thought local checkpoint updates are 
frequent, so I expected to get part of the state to be reset but not the whole 
local state.
 * Although we have a  long transaction timeout config, this appears many times 
in the logs, after which kafka streams gets into error state. On startup, local 
checkpoint file is not found:

`Transiting to abortable error state due to 
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.`
 

The service has 10 instances all having the same behaviour. The issue 
disappears when EOS is disabled.

The kafka cluster runs kafka 2.6, with minimum isr of 3.

 

 

  was:
Hey,

I have a kafka streams service that aggregates events from multiple input 
topics (running in a k8s cluster). The topology has multiple FKJs. The input 
topics have around 7 billion events when the service was started from 
`earliest`.

The service has EOS enabled and 
`[transaction.timeout.ms|http://transaction.timeout.ms/]` is `60`. 

The problem I am having is with frequent local state wipe-outs, this leads to 
very long rebalances. As can be seen from the attached images, local disk sizes 
go to ~ 0 very often. These wipe out are part of the EOS guarantee based on 
this log message: 
State store transfer-store did not find checkpoint offsets while stores are not 
empty, since under EOS it has the risk of getting uncommitted data in stores we 
have to treat it as a task corruption error and wipe out the local state of 
task 1_8 before re-bootstrapping
 

I noticed that this happens as a result of one of the following:
 * Process gets sigkill when running out of memory or on failure to shutdown 
gracefully on pod rotation for example, this explains the missing local 
checkpoint file, but for some reason I thought local checkpoint updates are 
frequent, so I expected to get part of the state to be reset but not the whole 
local state.
 * Although we have a  long transaction timeout config, this appears many times 
in the logs, after which kafka streams gets into error state. On startup, local 
checkpoint file is not found:

Transiting to abortable error state due to 
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
 

The service has 10 instances all having the same behaviour. The issue 
disappears when EOS is disabled.

The kafka cluster runs kafka 2.6, with minimum isr of 3.

 

 


> Local state wipeout with EOS
> 
>
> Key: KAFKA-14440
> URL: https://issues.apache.org/jira/browse/KAFKA-14440
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.3
>Reporter: Abdullah alkhawatrah
>Priority: Major
> Attachments: Screenshot 2022-12-02 at 09.26.27.png
>
>
> Hey,
> I have a kafka streams service that aggregates events from multiple input 
> topics (running in a k8s cluster). The topology has multiple FKJs. The input 
> topics have around 7 billion events when the service was started from 
> `earliest`.
> The service has EOS enabled and 
> `[transaction.timeout.ms|http://transaction.timeout.ms/]` is `60`. 
> The problem I am having is with frequent local state wipe-outs, this leads to 
> very long rebalances. As can be seen from the attached images, local disk 
> sizes go to ~ 0 very often. These wipe out are part of the EOS guarantee 
> based on this log message: 
> `State store transfer-store did not find checkpoint offsets while stores are 
> not empty, since under EOS it has the risk of 

[jira] [Created] (KAFKA-14440) Local state wipeout with EOS

2022-12-03 Thread Abdullah alkhawatrah (Jira)
Abdullah alkhawatrah created KAFKA-14440:


 Summary: Local state wipeout with EOS
 Key: KAFKA-14440
 URL: https://issues.apache.org/jira/browse/KAFKA-14440
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.2.3
Reporter: Abdullah alkhawatrah
 Attachments: Screenshot 2022-12-02 at 09.26.27.png

Hey,

I have a kafka streams service that aggregates events from multiple input 
topics (running in a k8s cluster). The topology has multiple FKJs. The input 
topics have around 7 billion events when the service was started from 
`earliest`.

The service has EOS enabled and 
`[transaction.timeout.ms|http://transaction.timeout.ms/]` is `60`. 

The problem I am having is with frequent local state wipe-outs, this leads to 
very long rebalances. As can be seen from the attached images, local disk sizes 
go to ~ 0 very often. These wipe out are part of the EOS guarantee based on 
this log message: 
State store transfer-store did not find checkpoint offsets while stores are not 
empty, since under EOS it has the risk of getting uncommitted data in stores we 
have to treat it as a task corruption error and wipe out the local state of 
task 1_8 before re-bootstrapping
 

I noticed that this happens as a result of one of the following:
 * Process gets sigkill when running out of memory or on failure to shutdown 
gracefully on pod rotation for example, this explains the missing local 
checkpoint file, but for some reason I thought local checkpoint updates are 
frequent, so I expected to get part of the state to be reset but not the whole 
local state.
 * Although we have a  long transaction timeout config, this appears many times 
in the logs, after which kafka streams gets into error state. On startup, local 
checkpoint file is not found:

Transiting to abortable error state due to 
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
 

The service has 10 instances all having the same behaviour. The issue 
disappears when EOS is disabled.

The kafka cluster runs kafka 2.6, with minimum isr of 3.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] yashmayya commented on a diff in pull request #12800: KAFKA-14342: KafkaOffsetBackingStore should clear offsets for source partitions on tombstone messages

2022-12-03 Thread GitBox


yashmayya commented on code in PR #12800:
URL: https://github.com/apache/kafka/pull/12800#discussion_r1038756498


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##
@@ -329,6 +329,60 @@ public void testGetSetNull() throws Exception {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testTombstoneOffset() throws Exception {
+expectConfigure();
+expectStart(Collections.singletonList(new ConsumerRecord<>(TOPIC, 0, 
0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
+new RecordHeaders(), Optional.empty(;
+
+Capture producerCallback = 
EasyMock.newCapture();
+storeLog.send(EasyMock.aryEq(TP0_KEY.array()), 
EasyMock.isNull(byte[].class), EasyMock.capture(producerCallback));
+PowerMock.expectLastCall();
+
+final Capture> readToEndCallback = 
EasyMock.newCapture();
+storeLog.readToEnd(EasyMock.capture(readToEndCallback));
+PowerMock.expectLastCall().andAnswer(() -> {
+capturedConsumedCallback.getValue().onCompletion(null,
+new ConsumerRecord<>(TOPIC, 1, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), null,
+new RecordHeaders(), Optional.empty()));
+readToEndCallback.getValue().onCompletion(null, null);
+return null;
+});
+
+expectStop();
+expectClusterId();
+
+PowerMock.replayAll();
+
+store.configure(DEFAULT_DISTRIBUTED_CONFIG);
+store.start();
+
+// Write tombstone offset
+Map toSet = new HashMap<>();
+toSet.put(TP0_KEY, null);
+
+final AtomicBoolean invoked = new AtomicBoolean(false);
+Future setFuture = store.set(toSet, (error, result) -> 
invoked.set(true));
+assertFalse(setFuture.isDone());
+producerCallback.getValue().onCompletion(null, null);
+setFuture.get(1, TimeUnit.MILLISECONDS);
+assertTrue(invoked.get());
+
+// Getting data should read to end of our published data and return it
+Map offsets = 
store.get(Collections.singletonList(TP0_KEY)).get(1, TimeUnit.MILLISECONDS);
+assertNull(offsets.get(TP0_KEY));
+
+// Just verifying that KafkaOffsetBackingStore::get returns null isn't 
enough, we also need to verify that the mapping for the source partition key is 
removed.
+// This is because KafkaOffsetBackingStore::get returns null if either 
there is no existing offset for the source partition or if there is an offset 
with null value.
+// We need to make sure that tombstoned offsets are removed completely 
(i.e. that the mapping for the corresponding source partition is removed).
+HashMap data = 
Whitebox.getInternalState(store, "data");
+assertFalse(data.containsKey(TP0_KEY));

Review Comment:
   I've done a rebase and re-written the test. IMO just adding the 
`store.data.containsKey` check to the end of `testGetSetNull` wouldn't test the 
case that we're actually trying to cover with this change - i.e. existing 
offsets are cleared from the in-memory map on tombstone offsets. 
`testGetSetNull` is just checking if null keys and null values can be set 
without issues, and instead of rewriting it to first write a non-null value and 
then attempt to clear it with a tombstone I felt like a separate test case 
would be better.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##
@@ -325,11 +325,12 @@ public Future set(final Map 
values, final Callback
 return producerCallback;
 }
 
-protected final Callback> consumedCallback 
= new Callback>() {
-@Override
-public void onCompletion(Throwable error, ConsumerRecord record) {
-ByteBuffer key = record.key() != null ? 
ByteBuffer.wrap(record.key()) : null;
-ByteBuffer value = record.value() != null ? 
ByteBuffer.wrap(record.value()) : null;
+protected final Callback> consumedCallback 
= (error, record) -> {
+ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) 
: null;

Review Comment:
   While the suggestion makes sense and I've added it, it looks like the 
`error` will never be non-null as of now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14439) Specify returned errors for various APIs and versions

2022-12-03 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642800#comment-17642800
 ] 

Tom Bentley commented on KAFKA-14439:
-

FWIW I spent some time on this a few years ago, see 
[KAFKA-7787|https://issues.apache.org/jira/browse/KAFKA-7787].

> Specify returned errors for various APIs and versions
> -
>
> Key: KAFKA-14439
> URL: https://issues.apache.org/jira/browse/KAFKA-14439
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Priority: Major
>
> Kafka is known for supporting various clients and being compatible across 
> different versions. But one thing that is a bit unclear is what errors each 
> response can send. 
> Knowing what errors can come from each version helps those who implement 
> clients have a more defined spec for what errors they need to handle. When 
> new errors are added, it is clearer to the clients that changes need to be 
> made.
> It also helps contributors get a better understanding about how clients are 
> expected to react and potentially find and prevent gaps like the one found in 
> https://issues.apache.org/jira/browse/KAFKA-14417
> I briefly synced offline with [~hachikuji] about this and he suggested maybe 
> adding values for the error codes in the schema definitions of APIs that 
> specify the error codes and what versions they are returned on. One idea was 
> creating some enum type to accomplish this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)