Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-24 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2130312242

   cherry-picked on 3.7 git


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-24 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2129247940

   @gharris1727 please see https://github.com/apache/kafka/pull/16070


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-23 Thread via GitHub


gharris1727 commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2127631801

   Yeah, if a section like that doesn't exist yet we can start it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-23 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2127624210

   do you mean a PR for a notable entry in docs/upgrade.html ?
   adding a section like 
   ```
   Notable 
changes in 3.7.1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-23 Thread via GitHub


gharris1727 commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2127588361

   From my earlier comment:
   
   > I think you can backport this once you have a full release note written 
that can be backported at the same time.
   
   Please open a PR for the release note first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-23 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2127581875

   @gharris1727 we would like to backport to 3.7 - ok ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-23 Thread via GitHub


edoardocomar merged PR #15910:
URL: https://github.com/apache/kafka/pull/15910


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-22 Thread via GitHub


gharris1727 commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2125340147

   @edoardocomar Since we're not substantially changing that class, I think 
it's acceptable to keep the old visibility or add the suppression, rather than 
fix the this-escape. It's up to you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-22 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2125147371

   Hi @gharris1727 ... more about warnings. there are two Java21 compiler 
warnings that result in a compile failure
   ```
   [2024-05-22T02:09:24.247Z] > Task :connect:mirror:compileJava
   [2024-05-22T02:09:24.247Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15910@2/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:78:
 warning: [this-escape] possible 'this' escape before subclass is fully 
initialized
   [2024-05-22T02:09:24.247Z] store = createBackingStore(config, 
consumer, admin);
   [2024-05-22T02:09:24.247Z]   ^
   [2024-05-22T02:09:24.247Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15910@2/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:94:
 warning: [this-escape] previous possible 'this' escape happens here via 
invocation
   [2024-05-22T02:09:24.247Z] (error, record) -> 
this.handleRecord(record),
   [2024-05-22T02:09:24.247Z] ^
   [2024-05-22T02:09:24.247Z] error: warnings found and -Werror specified
   [2024-05-22T02:09:24.247Z] 1 error
   [2024-05-22T02:09:24.247Z] 2 warnings
   [2024-05-22T02:09:24.247Z] 
   [2024-05-22T02:09:24.247Z] > Task :connect:mirror:compileJava FAILED
   ```
   
   Do you agree that as we introduced overridable methods in the constructor 
for test usage, this time we should suppress these with the annotation 
   @SuppressWarnings("this-escape") 
   
   ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-22 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2124241780

   Thanks @gharris1727 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1609062958


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -105,12 +106,19 @@ private KafkaBasedLog 
createBackingStore(MirrorCheckpointConfig
 /**
  * Start the OffsetSyncStore, blocking until all previous Offset Syncs 
have been read from backing storage.
  */
-public void start() {
-backingStore.start();
+public void start(boolean initializationMustReadToEnd) {
+this.initializationMustReadToEnd = initializationMustReadToEnd;
+log.debug("OffsetSyncStore starting - must read to OffsetSync end = ", 
initializationMustReadToEnd);

Review Comment:
   noticed one typo here, the log message actually doesn't print the boolean



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2123528226

   @gharris1727 I gave up and used the ugly try. 
   That warning is not occurring in every test... But I went all the way in 
`OffsetSyncStoreTest` as I prefer consistency to beauty. 
   Removed a couple of warnings in MirrorCheckpointTaskTest too in the second 
commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608965614


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -82,77 +86,131 @@ public void testOffsetTranslation() {
 
 @Test
 public void testNoTranslationIfStoreNotStarted() {
-try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
-// no offsets exist and store is not started
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
+FakeOffsetSyncStore store = new FakeOffsetSyncStore() {

Review Comment:
   > if you approve I'd also backport the fix to 3.7
   
   I'm on the fence about that, leaning towards yes. I regret backporting 
KAFKA-12468 so far and introducing this issue, and I didn't communicate it 
properly to users. I think you can backport this once you have a full release 
note written that can be backported at the same time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608959573


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -82,77 +86,131 @@ public void testOffsetTranslation() {
 
 @Test
 public void testNoTranslationIfStoreNotStarted() {
-try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
-// no offsets exist and store is not started
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
+FakeOffsetSyncStore store = new FakeOffsetSyncStore() {

Review Comment:
   I have somewhat strong feelings, I wouldn't call them very strong. If 
someone noticed this IDE warning and created a ticket and a PR to fix it, I 
would review that. Am I going to make the build enforce this warning? No, but I 
have seen other situations where the warning did point out real resource 
leaks...
   
   I just wanted to save the effort required to go and rework this later, and 
prevent this PR from introducing an easily avoidable warning. I agree with you 
about suppressing warnings, I don't think that is a healthy practice to have.
   
   I just tried making this a try-with-resources and the indenting turned out 
fine. The body of backingStoreStart is at the exact same indentation as it is 
currently.
   ```
   try (FakeOffsetSyncStore store = new FakeOffsetSyncStore() {
   @Override
   void backingStoreStart() {
   // read a sync during startup
   sync(tp, 100, 200);
   assertEquals(OptionalLong.empty(), translateDownstream(null, 
tp, 0));
   assertEquals(OptionalLong.empty(), translateDownstream(null, 
tp, 100));
   assertEquals(OptionalLong.empty(), translateDownstream(null, 
tp, 200));
   }
   }) {
   // no offsets exist and store is not started
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 0));
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 100));
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 200));
   
   // After the store is started all offsets are visible
   store.start(true);
   
   assertEquals(OptionalLong.of(-1), 
store.translateDownstream(null, tp, 0));
   assertEquals(OptionalLong.of(200), 
store.translateDownstream(null, tp, 100));
   assertEquals(OptionalLong.of(201), 
store.translateDownstream(null, tp, 200));
   }
   ```
   Here's the Consumer alternative I thought about, which uses one less 
indentation level at the cost of a variable, a field, and two constructors:
   ```
  Consumer init = store -> {
   // read a sync during startup
   store.sync(tp, 100, 200);
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 0));
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 100));
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 200));
   };
   try (FakeOffsetSyncStore store = new FakeOffsetSyncStore(init)) {
   // no offsets exist and store is not started
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 0));
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 100));
   assertEquals(OptionalLong.empty(), 
store.translateDownstream(null, tp, 200));
   
   // After the store is started all offsets are visible
   store.start(true);
   
   assertEquals(OptionalLong.of(-1), 
store.translateDownstream(null, tp, 0));
   assertEquals(OptionalLong.of(200), 
store.translateDownstream(null, tp, 100));
   assertEquals(OptionalLong.of(201), 
store.translateDownstream(null, tp, 200));
   }
   ```
   Either of these is preferable to having the warning or suppressing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608907673


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -82,77 +86,131 @@ public void testOffsetTranslation() {
 
 @Test
 public void testNoTranslationIfStoreNotStarted() {
-try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
-// no offsets exist and store is not started
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
+FakeOffsetSyncStore store = new FakeOffsetSyncStore() {

Review Comment:
   on another note, if you approve I'd also backport the fix to 3.7 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608896699


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -82,77 +86,131 @@ public void testOffsetTranslation() {
 
 @Test
 public void testNoTranslationIfStoreNotStarted() {
-try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
-// no offsets exist and store is not started
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
+FakeOffsetSyncStore store = new FakeOffsetSyncStore() {

Review Comment:
   Hi @gharris1727 
   I use IntelliJ too and saw the warning. 
   I could have used a `@suppress` annotation but I am very reluctant to make 
code less readable because of limited insight by linters. Similarly to make the 
fake store more complex.
   
   Using try-with-resource with a local class results in horrible indentation 
as you said.
   I don't share a strong worry of future leaks in testing - seems speculative 
to me.
   In this instance unless you have very strong feelings, I'd really leave the 
test as-is 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608896699


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -82,77 +86,131 @@ public void testOffsetTranslation() {
 
 @Test
 public void testNoTranslationIfStoreNotStarted() {
-try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
-// no offsets exist and store is not started
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
-assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
+FakeOffsetSyncStore store = new FakeOffsetSyncStore() {

Review Comment:
   Hi @gharris1727 I use IntelliJ too and see the warning. I am reluctant to 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2122938849

   @gharris1727 please review, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608522984


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -155,6 +154,58 @@ public void testPastOffsetTranslation() {
 }
 }
 
+// this test has been wriiten knowing the exact offsets syncs stored
+@Test
+public void testPastOffsetTranslationWithoutInitializationReadToEnd() {
+final int maxOffsetLag = 10;
+
+FakeOffsetSyncStore store = new FakeOffsetSyncStore() {
+@Override
+void backingStoreStart() {
+for (int offset = 0; offset <= 1000; offset += maxOffsetLag) {
+sync(tp, offset, offset);
+assertSparseSyncInvariant(this, tp);
+}
+}
+};
+
+store.start(false);
+
+// After starting but before seeing new offsets
+assertTranslationsNearby(store, 400, 480, 0);
+assertTranslationsNearby(store, 500, 720, 480);
+assertTranslationsNearby(store, 1000, 1000, 990);
+
+for (int offset = 1000; offset <= 1; offset += maxOffsetLag) {
+store.sync(tp, offset, offset);
+assertSparseSyncInvariant(store, tp);
+}
+
+// After seeing new offsets, 1000 was kicked out of the store, so
+// 1000 can only be traslated to 1, only previously stored offset is 0
+assertTranslationsNearby(store, 1000, 3840, 0);
+
+// We can translate offsets between the latest startup offset and the 
latest offset with variable precision
+// Older offsets are less precise and translation ends up farther apart
+assertTranslationsNearby(store, 3840, 3840, 0);
+assertTranslationsNearby(store, 7680, 7680, 3840);
+assertTranslationsNearby(store, 8640, 8640, 7680);
+assertTranslationsNearby(store, 9120, 9120, 8640);
+assertTranslationsNearby(store, 9600, 9600, 9120);
+assertTranslationsNearby(store, 9840, 9840, 9600);
+assertTranslationsNearby(store, 9900, 9900, 9840);
+assertTranslationsNearby(store, 9960, 9960, 9900);
+assertTranslationsNearby(store, 9990, 9990, 9960);
+assertTranslationsNearby(store, 1, 1, 9990);

Review Comment:
   brilliant suggestion!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608522984


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -155,6 +154,58 @@ public void testPastOffsetTranslation() {
 }
 }
 
+// this test has been wriiten knowing the exact offsets syncs stored
+@Test
+public void testPastOffsetTranslationWithoutInitializationReadToEnd() {
+final int maxOffsetLag = 10;
+
+FakeOffsetSyncStore store = new FakeOffsetSyncStore() {
+@Override
+void backingStoreStart() {
+for (int offset = 0; offset <= 1000; offset += maxOffsetLag) {
+sync(tp, offset, offset);
+assertSparseSyncInvariant(this, tp);
+}
+}
+};
+
+store.start(false);
+
+// After starting but before seeing new offsets
+assertTranslationsNearby(store, 400, 480, 0);
+assertTranslationsNearby(store, 500, 720, 480);
+assertTranslationsNearby(store, 1000, 1000, 990);
+
+for (int offset = 1000; offset <= 1; offset += maxOffsetLag) {
+store.sync(tp, offset, offset);
+assertSparseSyncInvariant(store, tp);
+}
+
+// After seeing new offsets, 1000 was kicked out of the store, so
+// 1000 can only be traslated to 1, only previously stored offset is 0
+assertTranslationsNearby(store, 1000, 3840, 0);
+
+// We can translate offsets between the latest startup offset and the 
latest offset with variable precision
+// Older offsets are less precise and translation ends up farther apart
+assertTranslationsNearby(store, 3840, 3840, 0);
+assertTranslationsNearby(store, 7680, 7680, 3840);
+assertTranslationsNearby(store, 8640, 8640, 7680);
+assertTranslationsNearby(store, 9120, 9120, 8640);
+assertTranslationsNearby(store, 9600, 9600, 9120);
+assertTranslationsNearby(store, 9840, 9840, 9600);
+assertTranslationsNearby(store, 9900, 9900, 9840);
+assertTranslationsNearby(store, 9960, 9960, 9900);
+assertTranslationsNearby(store, 9990, 9990, 9960);
+assertTranslationsNearby(store, 1, 1, 9990);

Review Comment:
   brilliant!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608507355


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -119,7 +118,7 @@ public void testPastOffsetTranslation() {
 store.sync(tp, offset, offset);
 assertSparseSyncInvariant(store, tp);

Review Comment:
   ok - we can assert sync is only called after start



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608471528


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java:
##
@@ -271,4 +284,102 @@ private Map 
assertCheckpointForTopic(
 assertEquals(truth, checkpoints.containsKey(remoteTp), "should" + 
(truth ? "" : " not") + " emit offset sync");
 return checkpoints;
 }
+
+@Test
+public void testCheckpointsTaskRestartUsesExistingCheckpoints() {

Review Comment:
   Thanks - we fixed the reassignments. 
   We already load the OffsetSyncStore with different OffsetSync, but we think 
the CheckpointStore at restart of the task should contain the exact last 
checkpoint emitted by the previous instance of the task 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608024060


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE;
+
+/**
+ * Reads once the Kafka log for checkpoints and populates a map of
+ * checkpoints per consumer group.
+ *
+ * The Kafka log is closed after the initial load and only the in memory map is
+ * used after start.
+ */
+class CheckpointsStore implements AutoCloseable {

Review Comment:
   Thanks for the feedback. The general practice is known, but we made this 
class package-local because so is the existing OffsetSyncStore - will revert 
CheckpointsStore and also make OffsetSyncStore public for consistency
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-21 Thread via GitHub


edoardocomar commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1608024060


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE;
+
+/**
+ * Reads once the Kafka log for checkpoints and populates a map of
+ * checkpoints per consumer group.
+ *
+ * The Kafka log is closed after the initial load and only the in memory map is
+ * used after start.
+ */
+class CheckpointsStore implements AutoCloseable {

Review Comment:
   Thanks this usage of access is clear, but we made this class package-local 
because so is the existing OffsetSyncStore - will revert 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-20 Thread via GitHub


gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1605384701


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE;
+
+/**
+ * Reads once the Kafka log for checkpoints and populates a map of
+ * checkpoints per consumer group.
+ *
+ * The Kafka log is closed after the initial load and only the in memory map is
+ * used after start.
+ */
+class CheckpointsStore implements AutoCloseable {

Review Comment:
   optional nit: This class can be public, along with the methods that are 
intended to be used by MirrorCheckpointTask, because this isn't a 
publically-documented package (like clients, or connect-api, etc.)
   
   
   Outside of those publically-documented packages, the general practice is 
public for external callers, even if the current callers are in the same 
package. We only use package-local for things that would be protected/private, 
but need to be accessed in tests (and so come with the visibility comment.)



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java:
##
@@ -271,4 +284,102 @@ private Map 
assertCheckpointForTopic(
 assertEquals(truth, checkpoints.containsKey(remoteTp), "should" + 
(truth ? "" : " not") + " emit offset sync");
 return checkpoints;
 }
+
+@Test
+public void testCheckpointsTaskRestartUsesExistingCheckpoints() {

Review Comment:
   I think using "real checkpoints" generated by the first MirrorCheckpointTask 
to test the second MirrorCheckpointTask is not necessary, and you can use 
simulated checkpoints instead.
   
   Reassigning variables and copy-pasting sections in tests is typo-prone and I 
think we can avoid it here.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -155,6 +154,58 @@ public void testPastOffsetTranslation() {
 }
 }
 
+// this test has been wriiten knowing the exact offsets syncs stored
+@Test
+public void testPastOffsetTranslationWithoutInitializationReadToEnd() {
+final int maxOffsetLag = 10;
+
+FakeOffsetSyncStore store = new FakeOffsetSyncStore() {
+@Override
+void backingStoreStart() {
+for (int offset = 0; offset <= 1000; offset += maxOffsetLag) {
+sync(tp, offset, offset);
+assertSparseSyncInvariant(this, tp);
+}
+}
+};
+
+store.start(false);
+
+// After starting but before seeing new offsets
+assertTranslationsNearby(store, 400, 480, 0);
+assertTranslationsNearby(store, 500, 720, 480);
+assertTranslationsNearby(store, 1000, 1000, 990);
+
+for (int offset = 1000; offset <= 1; offset += maxOffsetLag) {
+store.sync(tp, offset, offset);
+assertSparseSyncInvariant(store, tp);
+}
+
+// After seeing new offsets, 1000 was kicked out of the store, so
+// 1000 can only be traslated to 1, only previously stored offset is 0
+assertTranslationsNearby(store, 1000, 3840, 0);
+
+// We can translate offsets between the latest startup offset and the 
latest offset

Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-20 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2120792114

   Hi @gharris1727 if you have the time, can you please have a look again ? 
thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-17 Thread via GitHub


prestona commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2118027385

   Hi @gharris1727, hopefully the latest commits address your review comments. 
Once again, really appreciate all your feedback and suggestions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-17 Thread via GitHub


edoardocomar commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1604720566


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -105,7 +106,10 @@ private KafkaBasedLog 
createBackingStore(MirrorCheckpointConfig
 /**
  * Start the OffsetSyncStore, blocking until all previous Offset Syncs 
have been read from backing storage.
  */
-public void start() {
+public void start(boolean initializationMustReadToEnd) {
+this.initializationMustReadToEnd = initializationMustReadToEnd;
+log.info("OffsetSyncStore initializationMustReadToEnd:{}{}", 
initializationMustReadToEnd,
+initializationMustReadToEnd ? " - fewer checkpoints may be 
emitted" : "");

Review Comment:
   thanks we reworded the message where we catch the error, though we didn't 
write the offsets as there may be many groups handled by a task 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-16 Thread via GitHub


prestona commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1603914389


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java:
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE;
+
+/**
+ * Reads once the Kafka log for checkpoints and populates a map of
+ * checkpoints per consumer group
+ */
+public class CheckpointsStore implements AutoCloseable {
+
+private static final Logger log = 
LoggerFactory.getLogger(CheckpointsStore.class);
+
+private final MirrorCheckpointTaskConfig config;
+private final Set consumerGroups;
+
+private TopicAdmin cpAdmin = null;
+private KafkaBasedLog backingStore = null;
+private Map> 
checkpointsPerConsumerGroup;
+
+private volatile boolean loadSuccess = false;
+private volatile boolean isInitialized = false;
+
+public CheckpointsStore(MirrorCheckpointTaskConfig config, Set 
consumerGroups) {
+this.config = config;
+this.consumerGroups = new HashSet<>(consumerGroups);
+}
+
+// for testing
+CheckpointsStore(Map> 
checkpointsPerConsumerGroup) {
+this.config = null;
+this.consumerGroups = null;
+this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
+isInitialized = true;
+loadSuccess =  true;
+}
+
+// potentially long running
+public void start()  {
+checkpointsPerConsumerGroup = readCheckpoints();
+isInitialized = true;
+log.trace("Checkpoints store content : {}", 
checkpointsPerConsumerGroup);
+}
+
+public boolean loadSuccess() {
+return loadSuccess;
+}
+
+public boolean isInitialized() {
+return isInitialized;
+}
+
+
+// return a mutable map - it is expected to be mutated by the Task
+public Map> contents() {

Review Comment:
   Appreciate the feedback. We oscillated forward and backwards on whether to 
break encapsulation or risk `CheckpointStore` increasingly implementing all the 
methods of `Map`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-16 Thread via GitHub


prestona commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1603914389


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java:
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE;
+
+/**
+ * Reads once the Kafka log for checkpoints and populates a map of
+ * checkpoints per consumer group
+ */
+public class CheckpointsStore implements AutoCloseable {
+
+private static final Logger log = 
LoggerFactory.getLogger(CheckpointsStore.class);
+
+private final MirrorCheckpointTaskConfig config;
+private final Set consumerGroups;
+
+private TopicAdmin cpAdmin = null;
+private KafkaBasedLog backingStore = null;
+private Map> 
checkpointsPerConsumerGroup;
+
+private volatile boolean loadSuccess = false;
+private volatile boolean isInitialized = false;
+
+public CheckpointsStore(MirrorCheckpointTaskConfig config, Set 
consumerGroups) {
+this.config = config;
+this.consumerGroups = new HashSet<>(consumerGroups);
+}
+
+// for testing
+CheckpointsStore(Map> 
checkpointsPerConsumerGroup) {
+this.config = null;
+this.consumerGroups = null;
+this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
+isInitialized = true;
+loadSuccess =  true;
+}
+
+// potentially long running
+public void start()  {
+checkpointsPerConsumerGroup = readCheckpoints();
+isInitialized = true;
+log.trace("Checkpoints store content : {}", 
checkpointsPerConsumerGroup);
+}
+
+public boolean loadSuccess() {
+return loadSuccess;
+}
+
+public boolean isInitialized() {
+return isInitialized;
+}
+
+
+// return a mutable map - it is expected to be mutated by the Task
+public Map> contents() {

Review Comment:
   Appreciate the feedback. We oscillated forward and backwards on whether to 
break encapsulation (at the risk of ´CheckpointStore` increasingly implementing 
all the methods of `Map`). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-16 Thread via GitHub


gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1603861223


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -395,7 +401,7 @@ void syncGroupOffset(String consumerGroupId, 
Map> 
getConvertedUpstreamOffset() {
 Map> result = new 
HashMap<>();
 
-for (Entry> entry : 
checkpointsPerConsumerGroup.entrySet()) {
+for (Entry> entry : 
checkpointsStore.contents().entrySet()) {
 String consumerId = entry.getKey();

Review Comment:
   this getConvertedUpstreamOffset could be moved to CheckpointStore since it 
only depends on the checkpoint store state.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -69,20 +69,22 @@ public class MirrorCheckpointTask extends SourceTask {
 private MirrorCheckpointMetrics metrics;
 private Scheduler scheduler;
 private Map> 
idleConsumerGroupsOffset;
-private Map> 
checkpointsPerConsumerGroup;
+private CheckpointsStore checkpointsStore;
+
 public MirrorCheckpointTask() {}
 
 // for testing
 MirrorCheckpointTask(String sourceClusterAlias, String targetClusterAlias,
-ReplicationPolicy replicationPolicy, OffsetSyncStore 
offsetSyncStore,
-Map> 
idleConsumerGroupsOffset,
-Map> 
checkpointsPerConsumerGroup) {
+ ReplicationPolicy replicationPolicy, OffsetSyncStore 
offsetSyncStore, Set consumerGroups,
+ Map> 
idleConsumerGroupsOffset,
+ CheckpointsStore checkpointsStore) {

Review Comment:
   nit: indenting



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -105,7 +106,10 @@ private KafkaBasedLog 
createBackingStore(MirrorCheckpointConfig
 /**
  * Start the OffsetSyncStore, blocking until all previous Offset Syncs 
have been read from backing storage.
  */
-public void start() {
+public void start(boolean initializationMustReadToEnd) {
+this.initializationMustReadToEnd = initializationMustReadToEnd;
+log.info("OffsetSyncStore initializationMustReadToEnd:{}{}", 
initializationMustReadToEnd,
+initializationMustReadToEnd ? " - fewer checkpoints may be 
emitted" : "");

Review Comment:
   nit: Make this more verbose and user-oriented. They don't care that the 
variable is called initializationMustReadToEnd, and "Must read to end" is a 
very technical description of what is happening here.
   Specify more precisely which checkpoints aren't being emitted. Fewer could 
mean every other one, but it's actually offsets which were mirrored before the 
task started.
   
   Actually, this message works better if you put it after the 
backingStore.start() call: You can print out the oldest offset sync to say that 
translation is starting there, and whether this is limited by the 
initialization setting.
   
   



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -39,10 +39,12 @@ static class FakeOffsetSyncStore extends OffsetSyncStore {
 super();
 }
 
-@Override
-public void start() {

Review Comment:
   bump on this comment, now that we're converging on the final design.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -172,7 +178,7 @@ private List sourceRecordsForGroup(String 
group) throws Interrupte
 long timestamp = System.currentTimeMillis();
 Map upstreamGroupOffsets = 
listConsumerGroupOffsets(group);
 Map newCheckpoints = 
checkpointsForGroup(upstreamGroupOffsets, group);
-Map oldCheckpoints = 
checkpointsPerConsumerGroup.computeIfAbsent(group, ignored -> new HashMap<>());
+Map oldCheckpoints = 
checkpointsStore.contents().computeIfAbsent(group, ignored -> new HashMap<>());
 oldCheckpoints.putAll(newCheckpoints);

Review Comment:
   Move this to a new `CheckpointsStore#emitCheckpoints(Map)` method



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -195,7 +201,7 @@ Map 
checkpointsForGroup(Map checkpoints = 
checkpointsPerConsumerGroup.get(checkpoint.consumerGroupId());
+Map checkpoints = 
checkpointsStore.contents().get(checkpoint.consumerGroupId());
 if (checkpoints == null) {
 log.trace("Emitting {} (first for this group)", checkpoint);
 return true;

Review Comment:
   This can be moved to a new `CheckpointStore#get(String, TopicPartition)` 
method.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java:
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ *

Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-16 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2115715035

   Hi @gharris1727 we worked out the asynchronous loading using a wrapper to 
the checkpointsPerGroupMap.
   however when testing with different level of authorizations to see the 
fallback behaviour, 
   the simplest approach was to have the callback rethrow. It's all 
encapsulated so it's not spoiling the task IMHO !
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-15 Thread via GitHub


edoardocomar commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1602265328


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -105,7 +106,13 @@ private KafkaBasedLog 
createBackingStore(MirrorCheckpointConfig
 /**
  * Start the OffsetSyncStore, blocking until all previous Offset Syncs 
have been read from backing storage.
  */
-public void start() {
+public void start(boolean initializationMustReadToEnd) {
+this.initializationMustReadToEnd = initializationMustReadToEnd;
+if (initializationMustReadToEnd) {
+log.warn("OffsetSyncStore initializationMustReadToEnd = {}", 
initializationMustReadToEnd);

Review Comment:
   I think that getting the fallback degraded behavior is not the normal case 
and therefore I strongly prefer a warn here.
   However here we do know the reason why we are getting this - so we should 
explain that in the record handler as suggested



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-15 Thread via GitHub


prestona commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1602244281


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -103,10 +113,11 @@ public void start(Map props) {
 targetAdminClient = 
config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"));
 metrics = config.metrics();
 idleConsumerGroupsOffset = new HashMap<>();
-checkpointsPerConsumerGroup = new HashMap<>();
+Optional>> checkpoints = 
readCheckpoints(config);

Review Comment:
   Thanks for the explanation, and suggestion. We'll take another look at 
reading the checkpoint topic asynchronously from the start() method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-15 Thread via GitHub


prestona commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1602234146


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -116,6 +127,73 @@ public void start(Map props) {
 consumerGroups.size(), sourceClusterAlias, 
config.targetClusterAlias(), consumerGroups);
 }
 
+// read the checkpoints topic to initialize the 
checkpointsPerConsumerGroup state of this task
+// the callback may only handle errors thrown by consumer.poll in 
KafkaBasedLog
+// e.g. unauthorized to read from topic (non-retriable)
+// if any are encountered, treat the loading of Checkpoints as failed.
+Optional>> 
readCheckpoints(MirrorCheckpointTaskConfig config) {
+AtomicBoolean successful = new AtomicBoolean(true);
+Map> checkpoints = new 
HashMap<>();
+Callback> consumedCallback = new 
Callback>() {
+@Override
+public void onCompletion(Throwable error, ConsumerRecord cpRecord) {
+if (error != null && successful.getAndSet(false)) {
+log.error("Error loading Checkpoint topic", error);

Review Comment:
   If we special case not authorized (as above), then the main reasons for 
hitting this are (hopefully?) transitory problems - for example: all brokers 
being down when the connector is first started. I agree that this should be a 
warning with a better explanation of impact.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-15 Thread via GitHub


prestona commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1602207815


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -105,7 +106,13 @@ private KafkaBasedLog 
createBackingStore(MirrorCheckpointConfig
 /**
  * Start the OffsetSyncStore, blocking until all previous Offset Syncs 
have been read from backing storage.
  */
-public void start() {
+public void start(boolean initializationMustReadToEnd) {
+this.initializationMustReadToEnd = initializationMustReadToEnd;
+if (initializationMustReadToEnd) {
+log.warn("OffsetSyncStore initializationMustReadToEnd = {}", 
initializationMustReadToEnd);

Review Comment:
   Our intention was to alert the Kafka admin that they are getting degraded 
behavior. However, reflecting on this, I wonder if would be better to special 
case "not authorized" such that:
   1. For not authorized we emit a warning (with a better worded explanation) - 
because the Kafka admin can choose to take an action that improves the 
frequency of checkpoints.
   1. For other cases, log at debug level.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-15 Thread via GitHub


gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1602156155


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -103,10 +113,11 @@ public void start(Map props) {
 targetAdminClient = 
config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"));
 metrics = config.metrics();
 idleConsumerGroupsOffset = new HashMap<>();
-checkpointsPerConsumerGroup = new HashMap<>();
+Optional>> checkpoints = 
readCheckpoints(config);

Review Comment:
   This is a potentially long blocking operation, and those should be avoided 
in start() methods because while the task is starting, it can't be stopped, and 
if the task can't be stopped within `task.shutdown.graceful.timeout.ms` it is 
aggressively cancelled.
   
   Since the main thread needs the result from readCheckpoints, I think it 
would be fine to check if it's been loaded and if not, just return an empty 
poll().



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -116,6 +127,73 @@ public void start(Map props) {
 consumerGroups.size(), sourceClusterAlias, 
config.targetClusterAlias(), consumerGroups);
 }
 
+// read the checkpoints topic to initialize the 
checkpointsPerConsumerGroup state of this task
+// the callback may only handle errors thrown by consumer.poll in 
KafkaBasedLog
+// e.g. unauthorized to read from topic (non-retriable)
+// if any are encountered, treat the loading of Checkpoints as failed.
+Optional>> 
readCheckpoints(MirrorCheckpointTaskConfig config) {
+AtomicBoolean successful = new AtomicBoolean(true);
+Map> checkpoints = new 
HashMap<>();
+Callback> consumedCallback = new 
Callback>() {
+@Override
+public void onCompletion(Throwable error, ConsumerRecord cpRecord) {
+if (error != null && successful.getAndSet(false)) {
+log.error("Error loading Checkpoint topic", error);

Review Comment:
   I'm on the fence whether this should be error or warn. It it something that 
the user _must_ address? I'm not so sure.
   
   I do think that this should have an actionable recommendation, or an 
explanation that the task is gracefully degrading because of this.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -105,7 +106,13 @@ private KafkaBasedLog 
createBackingStore(MirrorCheckpointConfig
 /**
  * Start the OffsetSyncStore, blocking until all previous Offset Syncs 
have been read from backing storage.
  */
-public void start() {
+public void start(boolean initializationMustReadToEnd) {
+this.initializationMustReadToEnd = initializationMustReadToEnd;
+if (initializationMustReadToEnd) {
+log.warn("OffsetSyncStore initializationMustReadToEnd = {}", 
initializationMustReadToEnd);

Review Comment:
   debug level, this is not worth warning about.
   
   :+1: for the variable name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-15 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2112731563

   Hi @gharris1727 commit e33edd2a72 hopefully address most of your comments. 
Thanks for the quick feedback.
   
   We also noticed that the loading of the checkpoints must complete before the 
task start method completes. 
   This to avoid the checkpointsPerConsumerGroup map to be accessed during the 
task active polls and not be completely initialized. So we moved that before 
scheduler.execute 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-14 Thread via GitHub


gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1600452206


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -116,6 +125,71 @@ public void start(Map props) {
 consumerGroups.size(), sourceClusterAlias, 
config.targetClusterAlias(), consumerGroups);
 }
 
+// read the checkpoints topic to initialize the 
checkpointsPerConsumerGroup state of this task
+private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+class CheckpointRecordHandler {
+private volatile KafkaException lastLoggedErrorReadingCheckpoints 
= null;
+
+void handle(Throwable error, ConsumerRecord 
cpRecord) {
+// See KafkaBasedLog.poll : only KafkaException can be passed 
as error
+if (error instanceof KafkaException) {
+// only log once
+if (lastLoggedErrorReadingCheckpoints == null || 
!lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) {
+log.error("Error loading Checkpoint topic", error);
+lastLoggedErrorReadingCheckpoints = (KafkaException) 
error;
+}
+
+if (error instanceof RetriableException) {
+return;
+} else {
+throw (KafkaException) error;
+}
+} else { // error is null
+lastLoggedErrorReadingCheckpoints = null;
+Checkpoint cp = Checkpoint.deserializeRecord(cpRecord);

Review Comment:
   deserialization can fail due to bad data in the topic



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -116,6 +125,71 @@ public void start(Map props) {
 consumerGroups.size(), sourceClusterAlias, 
config.targetClusterAlias(), consumerGroups);
 }
 
+// read the checkpoints topic to initialize the 
checkpointsPerConsumerGroup state of this task
+private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+class CheckpointRecordHandler {
+private volatile KafkaException lastLoggedErrorReadingCheckpoints 
= null;
+
+void handle(Throwable error, ConsumerRecord 
cpRecord) {
+// See KafkaBasedLog.poll : only KafkaException can be passed 
as error
+if (error instanceof KafkaException) {
+// only log once
+if (lastLoggedErrorReadingCheckpoints == null || 
!lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) {
+log.error("Error loading Checkpoint topic", error);
+lastLoggedErrorReadingCheckpoints = (KafkaException) 
error;
+}
+
+if (error instanceof RetriableException) {
+return;
+} else {
+throw (KafkaException) error;
+}
+} else { // error is null
+lastLoggedErrorReadingCheckpoints = null;
+Checkpoint cp = Checkpoint.deserializeRecord(cpRecord);
+if (consumerGroups.contains(cp.consumerGroupId())) {
+Map cps = 
checkpointsPerConsumerGroup.computeIfAbsent(cp.consumerGroupId(), ignored -> 
new HashMap<>());
+cps.put(cp.topicPartition(), cp);
+}
+}
+}
+}
+
+CheckpointRecordHandler handler = new CheckpointRecordHandler();
+TopicAdmin cpAdmin = null;
+KafkaBasedLog previousCheckpoints = null;
+
+try {
+cpAdmin = new TopicAdmin(
+config.targetAdminConfig("checkpoint-target-admin"),
+
config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin")));
+
+previousCheckpoints = KafkaBasedLog.withExistingClients(
+config.checkpointsTopic(),
+
MirrorUtils.newConsumer(config.targetConsumerConfig(CHECKPOINTS_TARGET_CONSUMER_ROLE)),
+null,
+cpAdmin,
+(error, cpRecord) -> handler.handle(error, cpRecord),
+Time.SYSTEM,
+ignored -> { },
+topicPartition -> topicPartition.partition() == 0);
+
+log.info("Starting loading Checkpoint topic : {}", 
config.checkpointsTopic());
+previousCheckpoints.start(true);
+previousCheckpoints.stop();
+log.info("Finished loading Checkpoint topic : {}", 
config.checkpointsTopic());
+log.debug("Initial checkpointsPerConsumerGroup : {}", 
checkpointsPerConsumerGroup);
+return true;
+}  catch (KafkaException kexc) 

Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-14 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2110419872

   looks like we should rename this PR as addressing mainly
   https://issues.apache.org/jira/browse/KAFKA-16622 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-14 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2110417331

   testing results in the following scenario:
   
   - produce 1 records
   - consume them 250 at a time
   - stop mm2
   - produce 1 records
   - restart consuming 
   - restart mm2
   
   
   Emitted Checkpoints:
   ```
   % bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \
 --topic source.checkpoints.internal \
 --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter 
\
  --from-beginning
   ```
   
   NEW implementation with checkpoints read by Checkpoint task
   ```
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=500, downstreamOffset=1, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=6250, downstreamOffset=5820, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=7250, downstreamOffset=7228, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=8750, downstreamOffset=8658, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=9500, downstreamOffset=9362, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=9750, downstreamOffset=9714, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=1, downstreamOffset=1, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=12250, downstreamOffset=11519, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=14500, downstreamOffset=14335, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=17500, downstreamOffset=17195, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=18750, downstreamOffset=18603, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19500, downstreamOffset=19307, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19750, downstreamOffset=19659, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=2, downstreamOffset=2, metadata=}
   ```
   
   NEW implementation with checkpoints FAILED read by Checkpoint task
   ```
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=1, downstreamOffset=1, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=11250, downstreamOffset=1, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=16000, downstreamOffset=15919, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=17500, downstreamOffset=17349, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19000, downstreamOffset=18757, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19500, downstreamOffset=19461, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=2, downstreamOffset=2, metadata=}
   ```
   
   Original implementation (prior to this PR)
   ```
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=1, downstreamOffset=1, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=11250, downstreamOffset=1, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=16250, downstreamOffset=16249, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=18000, downstreamOffset=17987, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19000, downstreamOffset=18768, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19750, downstreamOffset=19747, metadata=}
   Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=2, downstreamOffset=2, metadata=}
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-14 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2109896449

   @gharris1727 thanks for your feedback. we've added another commit to allow 
for the old OffsetSyncs load behavior in case the task cannot read the 
checkpoints


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-10 Thread via GitHub


gharris1727 commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2104915418

   @edoardocomar In general connectors do have to add a configuration like this 
eventually, because users have different tolerances for errors. Some users want 
the errors to cause the connector to become FAILED, so that they can see the 
exception in the REST API and retry it explicitly. Other users want the 
connector to retry internally infinitely, and not fail for any reason.
   
   MM2 has a _lot_ of operations that can fail, and virtually none of them 
cause the connector to fail. The reason for this is that MM2 has dedicated 
mode, where there isn't a REST API to surface errors or perform external 
retries, so external retries are very expensive. It is definitely something 
that could be fixed eventually with like a "strict mode"? configuration or 
similar. We've also considered ways to address this from the framework side, 
with retry policies and automatic restarts, but none of that has been fully 
designed or implemented yet.
   
   I think we should not block this fix on solving that more general problem. 
If there is a permissions error loading the checkpoints, MM2 should log that, 
and then degrade gracefully to the current behavior. We can have a KIP that 
adds "strict mode" make this failure surface, to make this new permission 
required.
   
   In practical terms, without a configuration and with the graceful 
degradation implementation, we can get this into 3.8.
   If you're interested in the configuration, that will delay this feature 
until 4.0. I'm fine with either, but I think the current behavior has caused 
such considerable friction in the community that we should prefer a 3.8 release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-10 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2104835965

   Hi @gharris1727 we're now handling errors in loading the Checkpoints topic.
   (we still have to add unit tests)
   
   Specifically we tested with the not authorized to read case - which the 
existing KafkaBasedLog was not handling well.
   At this current stage the task start would fail, which to us seems an 
improvement as it is detectable and actionable (expecting the change to be 
noted in the release notes).
   
   This looks to us a better behavior than reverting to the old one in case of 
failure, as maintaining and testing two modes of operation seems too complex.
   
   Do you still think we need a KIP - to introduce yet another config to choose 
between the old behavior (default) and the new one (arguably better in the eyes 
of this PR authors ...) ?
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-09 Thread via GitHub


gharris1727 commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2103188026

   Hey @edoardocomar and @prestona thanks for the PR!
   
   One of the reasons I thought this might require a KIP is because it requires 
additional permissions that the current MM2 doesn't need: If an operator has 
already configured ACLs such that MM2 has write permissions for the checkpoints 
topic but no read permissions, it could be operating today and then failing 
after an upgrade with this change. I don't know if that is a common 
configuration or even a recommended one, but it does seem possible in the wild.
   
   Perhaps this can be configuration-less and backwards-compatible if we 
fallback to the old behavior if reading the checkpoints fails for any reason, 
including insufficient permissions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-09 Thread via GitHub


edoardocomar opened a new pull request, #15910:
URL: https://github.com/apache/kafka/pull/15910

   KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently 
interrupt offset translation
   
   MirrorCheckpointTask reloads the last checkpoint at start, OffsetSyncStore 
stores OffsetSyncs before reading till end.
   
   Add test case simulating restarted task where the store is reinitialized 
with later OffsetSyncs and check that emitted Checkpoint do not rewind.
   
   Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until 
consumer group fully catches up once because the OffsetSyncStore store is 
populated before reading to log end.
   
   Co-Authored-By: Adrian Preston 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org