[GitHub] [kafka] ijuma commented on pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-20 Thread GitBox


ijuma commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1189943978

   @junrao the updated version looks good to me. Thanks @artemlivshits for the 
patience and iterations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12410: MINOR: Remove unused ShutdownableThread class and ineffective ThreadedTest classes

2022-07-20 Thread GitBox


clolov commented on PR #12410:
URL: https://github.com/apache/kafka/pull/12410#issuecomment-1189957440

   Looks great to me!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568922#comment-17568922
 ] 

ASF GitHub Bot commented on KAFKA-13868:


mimaison merged PR #424:
URL: https://github.com/apache/kafka-site/pull/424




> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on pull request #12409: KAFKA-14058: Migrate ExactlyOnceWorkerSourceTaskTest from EasyMock and PowerMock to Mockito

2022-07-20 Thread GitBox


clolov commented on PR #12409:
URL: https://github.com/apache/kafka/pull/12409#issuecomment-1189989887

   I find this refactor quite readable. Looks good to me :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito

2022-07-20 Thread GitBox


clolov commented on code in PR #12422:
URL: https://github.com/apache/kafka/pull/12422#discussion_r925337713


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java:
##
@@ -53,64 +50,72 @@ public class WorkerConfigTransformerTest {
 public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL";
 public static final String TEST_RESULT_WITH_LONGER_TTL = 
"testResultWithLongerTTL";
 
-@Mock private Herder herder;
-@Mock private Worker worker;
-@Mock private HerderRequest requestId;
+private final Herder herder = Mockito.mock(Herder.class);
+private final Worker worker = Mockito.mock(Worker.class);
+private final HerderRequest requestId = Mockito.mock(HerderRequest.class);

Review Comment:
   If I use @Mock I get the following error in some of the tests
   ```
   java.lang.NullPointerException: Cannot invoke 
"org.apache.kafka.connect.runtime.Worker.herder()" because "this.worker" is null
   ```
   I saw that there are multiple approaches to solve this 
https://stackoverflow.com/questions/29590621/mock-instance-is-null-after-mock-annotation.
 Are we following any of those in the code base? If so, I will just change to 
that approach.



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java:
##
@@ -53,64 +50,72 @@ public class WorkerConfigTransformerTest {
 public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL";
 public static final String TEST_RESULT_WITH_LONGER_TTL = 
"testResultWithLongerTTL";
 
-@Mock private Herder herder;
-@Mock private Worker worker;
-@Mock private HerderRequest requestId;
+private final Herder herder = Mockito.mock(Herder.class);
+private final Worker worker = Mockito.mock(Worker.class);
+private final HerderRequest requestId = Mockito.mock(HerderRequest.class);

Review Comment:
   If I use `@Mock` I get the following error in some of the tests
   ```
   java.lang.NullPointerException: Cannot invoke 
"org.apache.kafka.connect.runtime.Worker.herder()" because "this.worker" is null
   ```
   I saw that there are multiple approaches to solve this 
https://stackoverflow.com/questions/29590621/mock-instance-is-null-after-mock-annotation.
 Are we following any of those in the code base? If so, I will just change to 
that approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito

2022-07-20 Thread GitBox


clolov commented on code in PR #12422:
URL: https://github.com/apache/kafka/pull/12422#discussion_r925339313


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java:
##
@@ -144,10 +149,6 @@ public void subscribe(String path, Set keys, 
ConfigChangeCallback callba
 throw new UnsupportedOperationException();
 }
 
-public void unsubscribe(String path, Set keys) {
-throw new UnsupportedOperationException();
-}
-

Review Comment:
   Agreed and added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12401: Minor: replace .kafka with .log in implementation documentation

2022-07-20 Thread GitBox


showuon commented on PR #12401:
URL: https://github.com/apache/kafka/pull/12401#issuecomment-1190003899

   > I think that's redundant information and most of the other images don't 
have a title. That said, I'm not against adding it back, it's just a matter of 
preference.
   
   Yes, I agree. But I'd like to keep title as before, because it might have 
some meaning there that we don't know.
   
   > This an image that can be easily reused in similar contexts, even 
externally. It's pretty hard to make it consistent with text, so I'm only 
addressing Kafka implementation consistency here.
   
   SGTM. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito

2022-07-20 Thread GitBox


clolov commented on code in PR #12422:
URL: https://github.com/apache/kafka/pull/12422#discussion_r925345205


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java:
##
@@ -53,64 +50,72 @@ public class WorkerConfigTransformerTest {
 public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL";
 public static final String TEST_RESULT_WITH_LONGER_TTL = 
"testResultWithLongerTTL";
 
-@Mock private Herder herder;
-@Mock private Worker worker;
-@Mock private HerderRequest requestId;
+private final Herder herder = Mockito.mock(Herder.class);
+private final Worker worker = Mockito.mock(Worker.class);
+private final HerderRequest requestId = Mockito.mock(HerderRequest.class);
 private WorkerConfigTransformer configTransformer;
 
 @Before
 public void setup() {
-worker = PowerMock.createMock(Worker.class);
-herder = PowerMock.createMock(Herder.class);
 configTransformer = new WorkerConfigTransformer(worker, 
Collections.singletonMap("test", new TestConfigProvider()));
 }
 
 @Test
 public void testReplaceVariable() {
+// Execution
 Map result = configTransformer.transform(MY_CONNECTOR, 
Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
+
+// Assertions
 assertEquals(TEST_RESULT, result.get(MY_KEY));
 }
 
 @Test
 public void testReplaceVariableWithTTL() {
-EasyMock.expect(worker.herder()).andReturn(herder);
-
-replayAll();
+// Setup
+when(worker.herder()).thenReturn(herder);
 
+// Execution
 Map props = new HashMap<>();
 props.put(MY_KEY, "${test:testPath:testKeyWithTTL}");
 props.put(CONFIG_RELOAD_ACTION_CONFIG, CONFIG_RELOAD_ACTION_NONE);
 Map result = configTransformer.transform(MY_CONNECTOR, 
props);
+
+// Assertions
+assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
 }
 
 @Test
 public void testReplaceVariableWithTTLAndScheduleRestart() {
-EasyMock.expect(worker.herder()).andReturn(herder);
-EasyMock.expect(herder.restartConnector(eq(1L), eq(MY_CONNECTOR), 
notNull())).andReturn(requestId);
-replayAll();
+// Setup
+when(worker.herder()).thenReturn(herder);
+when(herder.restartConnector(eq(1L), eq(MY_CONNECTOR), 
notNull())).thenReturn(requestId);
 
+// Execution
 Map result = configTransformer.transform(MY_CONNECTOR, 
Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
+
+// Assertions

Review Comment:
   Added.



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java:
##
@@ -53,64 +50,72 @@ public class WorkerConfigTransformerTest {
 public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL";
 public static final String TEST_RESULT_WITH_LONGER_TTL = 
"testResultWithLongerTTL";
 
-@Mock private Herder herder;
-@Mock private Worker worker;
-@Mock private HerderRequest requestId;
+private final Herder herder = Mockito.mock(Herder.class);
+private final Worker worker = Mockito.mock(Worker.class);
+private final HerderRequest requestId = Mockito.mock(HerderRequest.class);
 private WorkerConfigTransformer configTransformer;
 
 @Before
 public void setup() {
-worker = PowerMock.createMock(Worker.class);
-herder = PowerMock.createMock(Herder.class);
 configTransformer = new WorkerConfigTransformer(worker, 
Collections.singletonMap("test", new TestConfigProvider()));
 }
 
 @Test
 public void testReplaceVariable() {
+// Execution
 Map result = configTransformer.transform(MY_CONNECTOR, 
Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
+
+// Assertions
 assertEquals(TEST_RESULT, result.get(MY_KEY));
 }
 
 @Test
 public void testReplaceVariableWithTTL() {
-EasyMock.expect(worker.herder()).andReturn(herder);
-
-replayAll();
+// Setup
+when(worker.herder()).thenReturn(herder);
 
+// Execution
 Map props = new HashMap<>();
 props.put(MY_KEY, "${test:testPath:testKeyWithTTL}");
 props.put(CONFIG_RELOAD_ACTION_CONFIG, CONFIG_RELOAD_ACTION_NONE);
 Map result = configTransformer.transform(MY_CONNECTOR, 
props);
+
+// Assertions
+assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
 }
 
 @Test
 public void testReplaceVariableWithTTLAndScheduleRestart() {
-EasyMock.expect(worker.herder()).andReturn(herder);
-EasyMock.expect(herder.restartConnector(eq(1L), eq(MY_CONNECTOR), 
notNull())).andReturn(requestId);
-replayAll();
+// Setup
+when(worker.herder()).thenReturn(herder);
+when(herder.restartConnector(eq(1L), eq(MY_CONNECTOR), 
notNull())).thenReturn(requestId);
 
+// Exec

[GitHub] [kafka] clolov commented on a diff in pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito

2022-07-20 Thread GitBox


clolov commented on code in PR #12422:
URL: https://github.com/apache/kafka/pull/12422#discussion_r925346037


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java:
##
@@ -53,64 +50,72 @@ public class WorkerConfigTransformerTest {
 public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL";
 public static final String TEST_RESULT_WITH_LONGER_TTL = 
"testResultWithLongerTTL";
 
-@Mock private Herder herder;
-@Mock private Worker worker;
-@Mock private HerderRequest requestId;
+private final Herder herder = Mockito.mock(Herder.class);
+private final Worker worker = Mockito.mock(Worker.class);
+private final HerderRequest requestId = Mockito.mock(HerderRequest.class);

Review Comment:
   I decided to go for `@RunWith(MockitoJUnitRunner.class)` in the newest 
commit, but I am happy to change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13158) Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImpl Test and ConnectorPluginsResourceTest

2022-07-20 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568944#comment-17568944
 ] 

Christo Lolov commented on KAFKA-13158:
---

ConnectorPluginsResourceTest appears to have already been migrated to Mockito.

> Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImpl Test 
> and ConnectorPluginsResourceTest
> -
>
> Key: KAFKA-13158
> URL: https://issues.apache.org/jira/browse/KAFKA-13158
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: YI-CHEN WANG
>Assignee: YI-CHEN WANG
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-20 Thread GitBox


cadonna commented on code in PR #12285:
URL: https://github.com/apache/kafka/pull/12285#discussion_r925333066


##
streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java:
##
@@ -52,36 +52,46 @@
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+@Timeout(600)
 @Category(IntegrationTest.class)
 public class ErrorHandlingIntegrationTest {
 
+private final String testId;
+private final String appId;
+private final Properties properties;
+
+// Task 0
+private final String inputTopic;
+private final String outputTopic;
+// Task 1
+private final String errorInputTopic;
+private final String errorOutputTopic;
+
 private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
-@BeforeClass
+ErrorHandlingIntegrationTest(final TestInfo testInfo) {
+testId = safeUniqueTestName(getClass(), testInfo);
+appId = "appId_" + testId;
+properties = props();
+
+inputTopic = "input" + testId;
+outputTopic = "output" + testId;
+
+errorInputTopic = "error-input" + testId;
+errorOutputTopic = "error-output" + testId;
+}

Review Comment:
   Why did you not put this code into the `setup()` method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-20 Thread GitBox


cadonna commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1190036847

   @clolov could you please rebase your PR since there are conflicts? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov opened a new pull request, #12423: KAFKA-13158: Move ConnectClusterStateImpl to use Mockito

2022-07-20 Thread GitBox


clolov opened a new pull request, #12423:
URL: https://github.com/apache/kafka/pull/12423

   Addressing https://issues.apache.org/jira/browse/KAFKA-13158


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12423: KAFKA-13158: Move ConnectClusterStateImpl to use Mockito

2022-07-20 Thread GitBox


clolov commented on PR #12423:
URL: https://github.com/apache/kafka/pull/12423#issuecomment-1190043748

   Hi @C0urante hopefully another quick review :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13158) Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImpl Test and ConnectorPluginsResourceTest

2022-07-20 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov reassigned KAFKA-13158:
-

Assignee: Christo Lolov  (was: YI-CHEN WANG)

> Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImpl Test 
> and ConnectorPluginsResourceTest
> -
>
> Key: KAFKA-13158
> URL: https://issues.apache.org/jira/browse/KAFKA-13158
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: YI-CHEN WANG
>Assignee: Christo Lolov
>Priority: Major
>  Labels: connect, runtime
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13158) Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImpl Test and ConnectorPluginsResourceTest

2022-07-20 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-13158:
--
Labels: a  (was: connect runtime)

> Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImpl Test 
> and ConnectorPluginsResourceTest
> -
>
> Key: KAFKA-13158
> URL: https://issues.apache.org/jira/browse/KAFKA-13158
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: YI-CHEN WANG
>Assignee: Christo Lolov
>Priority: Major
>  Labels: a
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13158) Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImpl Test and ConnectorPluginsResourceTest

2022-07-20 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-13158:
--
Labels: connect runtime  (was: a)

> Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImpl Test 
> and ConnectorPluginsResourceTest
> -
>
> Key: KAFKA-13158
> URL: https://issues.apache.org/jira/browse/KAFKA-13158
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: YI-CHEN WANG
>Assignee: Christo Lolov
>Priority: Major
>  Labels: connect, runtime
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison merged pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure

2022-07-20 Thread GitBox


mimaison merged PR #12320:
URL: https://github.com/apache/kafka/pull/12320


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #12401: Minor: replace .kafka with .log in implementation documentation

2022-07-20 Thread GitBox


fvaleri commented on PR #12401:
URL: https://github.com/apache/kafka/pull/12401#issuecomment-1190050259

   > > I think that's redundant information and most of the other images don't 
have a title. That said, I'm not against adding it back, it's just a matter of 
preference.
   > 
   > Yes, I agree. But I'd like to keep title as before, because it might have 
some meaning there that we don't know.
   
   OK, done.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito

2022-07-20 Thread GitBox


ijuma commented on code in PR #12422:
URL: https://github.com/apache/kafka/pull/12422#discussion_r925416275


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java:
##
@@ -53,64 +50,72 @@ public class WorkerConfigTransformerTest {
 public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL";
 public static final String TEST_RESULT_WITH_LONGER_TTL = 
"testResultWithLongerTTL";
 
-@Mock private Herder herder;
-@Mock private Worker worker;
-@Mock private HerderRequest requestId;
+private final Herder herder = Mockito.mock(Herder.class);
+private final Worker worker = Mockito.mock(Worker.class);
+private final HerderRequest requestId = Mockito.mock(HerderRequest.class);

Review Comment:
   Since we are in the process of migrating to JUnit 5, it's simpler if we 
don't rely on JUnit 4 specific features. It's fine to use the `mock` method, 
but please use static imports to make it more concise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13158) Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImplTest

2022-07-20 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13158:

Summary: Replace EasyMock and PowerMock with Mockito for 
ConnectClusterStateImplTest  (was: Replace EasyMock and PowerMock with Mockito 
for ConnectClusterStateImpl Test and ConnectorPluginsResourceTest)

> Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImplTest
> ---
>
> Key: KAFKA-13158
> URL: https://issues.apache.org/jira/browse/KAFKA-13158
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: YI-CHEN WANG
>Assignee: Christo Lolov
>Priority: Major
>  Labels: connect, runtime
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13158) Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImplTest

2022-07-20 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568965#comment-17568965
 ] 

Ismael Juma commented on KAFKA-13158:
-

Updated the Jira title to only refer to ConnectClusterStateImplTest.

> Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImplTest
> ---
>
> Key: KAFKA-13158
> URL: https://issues.apache.org/jira/browse/KAFKA-13158
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: YI-CHEN WANG
>Assignee: Christo Lolov
>Priority: Major
>  Labels: connect, runtime
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13702) Connect RestClient overrides response status code on request failure

2022-07-20 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-13702.

Fix Version/s: 3.4.0
   Resolution: Fixed

> Connect RestClient overrides response status code on request failure
> 
>
> Key: KAFKA-13702
> URL: https://issues.apache.org/jira/browse/KAFKA-13702
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Elkhan Eminov
>Assignee: Elkhan Eminov
>Priority: Major
> Fix For: 3.4.0
>
>
> In case the submitted request status is >=400, the connect RestClient 
> [throws|https://github.com/apache/kafka/blob/8047ba3800436d6162d0f8eb707e28857ab9eb68/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L133]
>  a ConnectRestException with the proper response code, but it gets 
> intercepted and [rethrown with 500 status 
> code|https://github.com/apache/kafka/blob/8047ba3800436d6162d0f8eb707e28857ab9eb68/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L147],
>  effectively overriding the actual failure status. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison closed pull request #11844: KAFKA#13702 - Connect RestClient overrides response status code on request failure

2022-07-20 Thread GitBox


mimaison closed pull request #11844: KAFKA#13702 - Connect RestClient overrides 
response status code on request failure
URL: https://github.com/apache/kafka/pull/11844


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11844: KAFKA#13702 - Connect RestClient overrides response status code on request failure

2022-07-20 Thread GitBox


mimaison commented on PR #11844:
URL: https://github.com/apache/kafka/pull/11844#issuecomment-1190088514

   We merged https://github.com/apache/kafka/pull/12320, so closing this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #12401: Minor: replace .kafka with .log in implementation documentation

2022-07-20 Thread GitBox


showuon merged PR #12401:
URL: https://github.com/apache/kafka/pull/12401


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11838: KAFKA-1372: separate 400 error from 500 error in RestClient

2022-07-20 Thread GitBox


mimaison commented on PR #11838:
URL: https://github.com/apache/kafka/pull/11838#issuecomment-1190089277

   We merged https://github.com/apache/kafka/pull/12320, so closing this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison closed pull request #11838: KAFKA-1372: separate 400 error from 500 error in RestClient

2022-07-20 Thread GitBox


mimaison closed pull request #11838: KAFKA-1372: separate 400 error from 500 
error in RestClient
URL: https://github.com/apache/kafka/pull/11838


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12401: Minor: replace .kafka with .log in implementation documentation

2022-07-20 Thread GitBox


showuon commented on PR #12401:
URL: https://github.com/apache/kafka/pull/12401#issuecomment-1190089583

   Only doc change, so don't need to worry about test results. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tombentley commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-20 Thread GitBox


tombentley commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r925348863


##
core/src/main/scala/kafka/log/LogLoader.scala:
##
@@ -64,6 +65,7 @@ object LogLoader extends Logging {
  * @param recoveryPointCheckpoint The checkpoint of the offset at which to 
begin the recovery
  * @param leaderEpochCache An optional LeaderEpochFileCache instance to be 
updated during recovery
  * @param producerStateManager The ProducerStateManager instance to be updated 
during recovery
+ * @param numRemainingSegments The remaining segments to be recovered in this 
log

Review Comment:
   ```suggestion
* @param numRemainingSegments The remaining segments to be recovered in 
this log keyed by recovery thread name
   ```



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -307,6 +309,27 @@ class LogManager(logDirs: Seq[File],
 log
   }
 
+  // factory class for naming the log recovery threads used in metrics
+  class LogRecoveryThreadFactory(val dirPath: String) extends ThreadFactory {
+val threadNum = new AtomicInteger(0)
+
+override def newThread(runnable: Runnable): Thread = {
+  KafkaThread.nonDaemon(logRecoveryThreadName(dirPath, 
threadNum.getAndIncrement()), runnable)
+}
+  }
+
+  // create a unique log recovery thread name for each log dir as the format: 
prefix-dirPath-threadNum, ex: "log-recovery-/tmp/kafkaLogs-0"
+  private def logRecoveryThreadName(dirPath: String, threadNum: Int, prefix: 
String = "log-recovery"): String = s"$prefix-$dirPath-$threadNum"
+
+  /*
+   * decrement the number of remaining logs
+   * @return the number of remaining logs after decremented 1
+   */
+  private[log] def decNumRemainingLogs(numRemainingLogs: ConcurrentMap[String, 
AtomicInteger], path: String): Int = {
+require(path != null, "path cannot be null to update remaining logs 
metric.")
+numRemainingLogs.get(path).decrementAndGet()

Review Comment:
   Is it not possible to use `numRemainingLogs.compute()` and avoid the need 
for the `AtomicInteger`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14089) Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic

2022-07-20 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568972#comment-17568972
 ] 

Mickael Maison commented on KAFKA-14089:


cc [~ChrisEgerton]

> Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
> ---
>
> Key: KAFKA-14089
> URL: https://issues.apache.org/jira/browse/KAFKA-14089
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Priority: Major
> Attachments: failure.txt
>
>
> It looks like the sequence got broken around "65535, 65537, 65536, 65539, 
> 65538, 65541, 65540, 65543"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14089) Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic

2022-07-20 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14089:
--

 Summary: Flaky 
ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
 Key: KAFKA-14089
 URL: https://issues.apache.org/jira/browse/KAFKA-14089
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.3.0
Reporter: Mickael Maison
 Attachments: failure.txt

It looks like the sequence got broken around "65535, 65537, 65536, 65539, 
65538, 65541, 65540, 65543"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] viktorsomogyi commented on pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

2022-07-20 Thread GitBox


viktorsomogyi commented on PR #12392:
URL: https://github.com/apache/kafka/pull/12392#issuecomment-1190132574

   @hachikuji would you please review this PR as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

2022-07-20 Thread GitBox


urbandan commented on PR #12392:
URL: https://github.com/apache/kafka/pull/12392#issuecomment-1190135063

   @showuon @artemlivshits Can you please take a look at this PR? This is the 
issue we had a thread about on the dev list.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12397: KAFKA-10199: Cleanup TaskManager and Task interfaces

2022-07-20 Thread GitBox


cadonna commented on code in PR #12397:
URL: https://github.com/apache/kafka/pull/12397#discussion_r925471614


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1307,9 +1307,9 @@ public Set characteristics() {
 }
 
 @SafeVarargs
-public static  Set union(final Supplier> constructor, final 
Set... set) {
+public static  Set union(final Supplier> constructor, final 
Collection... set) {
 final Set result = constructor.get();
-for (final Set s : set) {
+for (final Collection s : set) {

Review Comment:
   This seems a bit weird to me. I see that you changed this because 
`Tasks#allTaskIds()` returns a `Collection` instead of a `Set`. I think it is 
fine to let `Tasks#allTaskIds()` return a `Set`. The other reason you changed 
that is that is `Tasks#allTasks()` computing its result from two collections. 
Those two collections can be transformed to sets. Maybe we should also consider 
to change the return type of `allTasks()` to a set.  



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -34,29 +33,40 @@
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.utils.Utils.filterMap;
+import static org.apache.kafka.common.utils.Utils.union;
+
+/**
+ * All tasks contained by the Streams instance.
+ *
+ * Note that these tasks are shared between the TaskManager (stream thread) 
and the StateUpdater (restore thread),
+ * i.e. all running active tasks are processed by the former and all restoring 
active tasks and standby tasks are
+ * processed by the latter.
+ */
 class Tasks {
 private final Logger log;
-private final TopologyMetadata topologyMetadata;
-
-private final Map allTasksPerId = 
Collections.synchronizedSortedMap(new TreeMap<>());
-private final Map readOnlyTasksPerId = 
Collections.unmodifiableMap(allTasksPerId);
-private final Collection readOnlyTasks = 
Collections.unmodifiableCollection(allTasksPerId.values());
 
 // TODO: change type to `StreamTask`
 private final Map activeTasksPerId = new TreeMap<>();
+// TODO: change type to `StandbyTask`
+private final Map standbyTasksPerId = new TreeMap<>();
+
+// Tasks may have been assigned for a NamedTopology that is not yet known 
by this host. When that occurs we stash
+// these unknown tasks until either the corresponding NamedTopology is 
added and we can create them at last, or
+// we receive a new assignment and they are revoked from the thread.
+
+// Tasks may have been assigned but not yet created because:
+// 1. They are for a NamedTopology that is yet known by this host.
+// 2. They are to be recycled from an existing restoring task yet to be 
returned from the task updater.

Review Comment:
   ```suggestion
   // 2. They are to be recycled from an existing restoring task yet to be 
returned from the state updater.
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java:
##
@@ -55,25 +57,28 @@ public abstract class AbstractTask implements Task {
 protected Map offsetSnapshotSinceLastFlush = null;
 
 protected final TaskId id;
+protected final TaskConfig config;
 protected final ProcessorTopology topology;
 protected final StateDirectory stateDirectory;
 protected final ProcessorStateManager stateMgr;
+
 private final long taskTimeoutMs;
 
 AbstractTask(final TaskId id,
  final ProcessorTopology topology,
  final StateDirectory stateDirectory,
  final ProcessorStateManager stateMgr,
  final Set inputPartitions,
- final long taskTimeoutMs,
+ final TaskConfig config,
  final String taskType,
  final Class clazz) {
 this.id = id;
 this.stateMgr = stateMgr;
 this.topology = topology;
+this.config = config;
 this.inputPartitions = inputPartitions;
 this.stateDirectory = stateDirectory;
-this.taskTimeoutMs = taskTimeoutMs;
+this.taskTimeoutMs = config.taskTimeoutMs;

Review Comment:
   I would not use a dedicated field for the task timeout but just directly use 
the task config object `config`.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -80,125 +87,165 @@ void setMainConsumer(final Consumer 
mainConsumer) {
 this.mainConsumer = mainConsumer;
 }
 
-void handleNewAssignmentAndCreateTasks(final Map> activeTasksToCreate,
-   final Map> standbyTasksToCreate,
-   final Set 
assignedActiveTasks,
-   final Set 
assignedStandbyTasks) {
-activeTaskCreator.removeRevokedUnknownTasks(assignedActiveTasks);
-standbyTaskCreator.removeR

[GitHub] [kafka] showuon commented on pull request #11783: KAFKA-10000: System tests (KIP-618)

2022-07-20 Thread GitBox


showuon commented on PR #11783:
URL: https://github.com/apache/kafka/pull/11783#issuecomment-1190242675

   @C0urante , my team members are trying to configure the testing environment 
to run a system test against this branch. But it still needs some time.
   
   @jsancio , it would be great if you can help run the system test against 
this branch.
   
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-20 Thread GitBox


showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r925591743


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -307,6 +309,27 @@ class LogManager(logDirs: Seq[File],
 log
   }
 
+  // factory class for naming the log recovery threads used in metrics
+  class LogRecoveryThreadFactory(val dirPath: String) extends ThreadFactory {
+val threadNum = new AtomicInteger(0)
+
+override def newThread(runnable: Runnable): Thread = {
+  KafkaThread.nonDaemon(logRecoveryThreadName(dirPath, 
threadNum.getAndIncrement()), runnable)
+}
+  }
+
+  // create a unique log recovery thread name for each log dir as the format: 
prefix-dirPath-threadNum, ex: "log-recovery-/tmp/kafkaLogs-0"
+  private def logRecoveryThreadName(dirPath: String, threadNum: Int, prefix: 
String = "log-recovery"): String = s"$prefix-$dirPath-$threadNum"
+
+  /*
+   * decrement the number of remaining logs
+   * @return the number of remaining logs after decremented 1
+   */
+  private[log] def decNumRemainingLogs(numRemainingLogs: ConcurrentMap[String, 
AtomicInteger], path: String): Int = {
+require(path != null, "path cannot be null to update remaining logs 
metric.")
+numRemainingLogs.get(path).decrementAndGet()

Review Comment:
   Good point, Tom! Replaced AtomicInteger with Int, and use compute instead.
   ```
   numRemainingLogs.compute(path, (_, oldVal) => oldVal - 1)
   ```
   I confirmed from the javadoc[1]
   > The entire method invocation is performed atomically.
   
   So it is still thread safe. Thank you for the suggestion!
   
   [1] 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#compute-K-java.util.function.BiFunction-



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-20 Thread GitBox


showuon commented on PR #12347:
URL: https://github.com/apache/kafka/pull/12347#issuecomment-1190273171

   @tombentley , I've addressed your comments in this commit: 
https://github.com/apache/kafka/pull/12347/commits/42cd5ac41eb2cc83548066ea053dad47a0eddd22
 . Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14002) Update Zookeeper client version to 3.8.0

2022-07-20 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-14002:
-
Fix Version/s: (was: 3.2.1)

> Update Zookeeper client version to 3.8.0
> 
>
> Key: KAFKA-14002
> URL: https://issues.apache.org/jira/browse/KAFKA-14002
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kobi Hikri
>Priority: Minor
>
> We need to update kafka to use the zookeeper client version 3.8.0, as 
> important bug fixes are included in it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14002) Update Zookeeper client version to 3.8.0

2022-07-20 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569027#comment-17569027
 ] 

David Arthur commented on KAFKA-14002:
--

Removing 3.2.1 from the fix version.

> Update Zookeeper client version to 3.8.0
> 
>
> Key: KAFKA-14002
> URL: https://issues.apache.org/jira/browse/KAFKA-14002
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kobi Hikri
>Priority: Minor
>
> We need to update kafka to use the zookeeper client version 3.8.0, as 
> important bug fixes are included in it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah opened a new pull request, #12424: RELEASE: Add 3.2 upgrade docs

2022-07-20 Thread GitBox


mumrah opened a new pull request, #12424:
URL: https://github.com/apache/kafka/pull/12424

   Looking through the issues fixed in 
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.2.1, I didn't 
see any changes to public APIs (config/metrics/CLI/etc) or any changes to 
default behaviors. I picked three major issues to include in the release notes.
   
   * https://issues.apache.org/jira/browse/KAFKA-14062 OAuth refresh problem -- 
the driver for this release
   * https://issues.apache.org/jira/browse/KAFKA-14079 a major Connect OOM issue
   * https://issues.apache.org/jira/browse/KAFKA-14024 3.2.0 consumer regression


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rhauch commented on a diff in pull request #12424: RELEASE: Add 3.2 upgrade docs

2022-07-20 Thread GitBox


rhauch commented on code in PR #12424:
URL: https://github.com/apache/kafka/pull/12424#discussion_r925721375


##
docs/upgrade.html:
##
@@ -61,6 +61,20 @@ Upgrading to 
3.2.0 from any vers
 
 
 
+Notable changes in 
3.2.1
+
+A problem with OAuth token refresh was fixed. During token 
refresh, the Kafka client was not including
+the SASL extensions which were present during the initial 
authentication. This leads to authentication
+failures. See https://issues.apache.org/jira/browse/KAFKA-14062";>KAFKA-14062 for 
details.
+A problem in Connect with error.tolerance was fixed. 
When error.tolerance
+was set to all, it was possible for offsets to not be 
committed and a memory leak to occur
+(eventually resulting in an OutOfMemoryError). 

Review Comment:
   Minor suggestions:
   ```suggestion
   A problem in Connect with error.tolerance was 
fixed. Starting in 3.2.0, when
   a source connector uses errors.tolerance=all and 
Connect's producer fails to
   successfully send a source record to the topic, the error is 
properly reported and the connector
   continues, but Connect stops committing source offsets for that 
source connector and a
   memory leak will eventually cause the Connect worker to fail 
with an OutOfMemoryError. 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao merged pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-20 Thread GitBox


junrao merged PR #12365:
URL: https://github.com/apache/kafka/pull/12365


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-20 Thread GitBox


junrao commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1190432816

   cherry-picked the PR to 3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14020) Performance regression in Producer

2022-07-20 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14020.
-
Resolution: Fixed

merged the PR to 3.3.

> Performance regression in Producer
> --
>
> Key: KAFKA-14020
> URL: https://issues.apache.org/jira/browse/KAFKA-14020
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.3.0
>Reporter: John Roesler
>Assignee: Artem Livshits
>Priority: Blocker
> Fix For: 3.3.0
>
>
> [https://github.com/apache/kafka/commit/f7db6031b84a136ad0e257df722b20faa7c37b8a]
>  introduced a 10% performance regression in the KafkaProducer under a default 
> config.
>  
> The context for this result is a benchmark that we run for Kafka Streams. The 
> benchmark provisions 5 independent AWS clusters, including one broker node on 
> an i3.large and one client node on an i3.large. During a benchmark run, we 
> first run the Producer for 10 minutes to generate test data, and then we run 
> Kafka Streams under a number of configurations to measure its performance.
> Our observation was a 10% regression in throughput under the simplest 
> configuration, in which Streams simply consumes from a topic and does nothing 
> else. That benchmark actually runs faster than the producer that generates 
> the test data, so its thoughput is bounded by the data generator's 
> throughput. After investigation, we realized that the regression was in the 
> data generator, not the consumer or Streams.
> We have numerous benchmark runs leading up to the commit in question, and 
> they all show a throughput in the neighborhood of 115,000 records per second. 
> We also have 40 runs including and after that commit, and they all show a 
> throughput in the neighborhood of 105,000 records per second. A test on 
> [trunk with the commit reverted |https://github.com/apache/kafka/pull/12342] 
> shows a return to around 115,000 records per second.
> Config:
> {code:java}
> final Properties properties = new Properties();
> properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
> properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class);
> properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class);
> {code}
> Here's the producer code in the data generator. Our tests were running with 
> three produceThreads.
> {code:java}
>  for (int t = 0; t < produceThreads; t++) {
> futures.add(executorService.submit(() -> {
> int threadTotal = 0;
> long lastPrint = start;
> final long printInterval = Duration.ofSeconds(10).toMillis();
> long now;
> try (final org.apache.kafka.clients.producer.Producer 
> producer = new KafkaProducer<>(producerConfig(broker))) {
> while (limit > (now = System.currentTimeMillis()) - start) {
> for (int i = 0; i < 1000; i++) {
> final String key = keys.next();
> final String data = dataGen.generate();
> producer.send(new ProducerRecord<>(topic, key, 
> valueBuilder.apply(key, data)));
> threadTotal++;
> }
> if ((now - lastPrint) > printInterval) {
> System.out.println(Thread.currentThread().getName() + " 
> produced " + numberFormat.format(threadTotal) + " to " + topic + " in " + 
> Duration.ofMillis(now - start));
> lastPrint = now;
> }
> }
> }
> total.addAndGet(threadTotal);
> System.out.println(Thread.currentThread().getName() + " finished (" + 
> numberFormat.format(threadTotal) + ") in " + Duration.ofMillis(now - start));
> }));
> }{code}
> As you can see, this is a very basic usage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13953) kafka Console consumer fails with CorruptRecordException

2022-07-20 Thread Doguscan Namal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569104#comment-17569104
 ] 

Doguscan Namal commented on KAFKA-13953:


[~junrao] 

1) The DumpLogSegment tool fails to output the data because it is corrupted. 
But here is the batch level info:
baseOffset: 88062356025 lastOffset: 88062429966 count: 73942 baseSequence: 
39997558 lastSequence: 40071499 producerId: 12 producerEpoch: 0 
partitionLeaderEpoch: 70 isTransactional: false isControl: false position: 
86385503 CreateTime: 1656027642522 size: 2543176 magic: 2 compresscodec: ZSTD 
crc: 2665675442 isvalid: true
 
One more point is that, actually consumer does not receive 
CorruptRecordException. Broker actually doesn't give anything back to the 
consumer at all.

The upstream message comes from a 2.8.1 KafkaCluster and data is read by a 
MirrorMaker 2.5.1 and written to a 2.5.1 cluster. Only one of the "sink" 
clusters ends up with the corrupted data.

 

2) DumpLogSegment is failing to provide record level information because there 
is a corrupted record. However, I am able to read all of the records up to the 
corrupted one, with negative size.


So here is the structure of the log file:

So all the batches are valid up to the one which contains the corrupted batch.
All records in the corrupted batch is valid, until the one which is corrupted.

readVarint() return -155493822 from the bytestream.


```
batch0
| record 0
| record 1
| ...
batch1
| record 0
| record 1
| ...
...
batchX 
| record 0
| record 1
| record n
| bytes (-155493822) bytes (recordStart:0 attributes: 0 timestampDelta: 391 
timestamp 1656027641475 offset: 88062375700 sequence:40017233 key: 
java.nio.HeapByteBuffer[pos=0 lim=25 cap=1199] value: 
java.nio.HeapByteBuffer[pos=0 lim=961 cap=1149] numHeaders: 5) ...

... NOT REACHABLE POSSIBLY NOT CORRPUTED BUT NO WAY TO VERIFY...

batchNOTReachable 
| record...

| record...
```

Given that crc check is valid, any idea on the next steps in debugging?

> kafka Console consumer fails with CorruptRecordException 
> -
>
> Key: KAFKA-13953
> URL: https://issues.apache.org/jira/browse/KAFKA-13953
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, controller, core
>Affects Versions: 2.7.0
>Reporter: Aldan Brito
>Priority: Blocker
>
> Kafka consumer fails with corrupt record exception. 
> {code:java}
> opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*: 
> --topic BQR-PULL-DEFAULT --from-beginning > 
> /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest
> [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record 
> to continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>         at 
> kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438)
>         at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
>         at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
>         at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
>         at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 
> 0 is less than the minimum record overhead (14)
> Processed a total of 15765197 messages {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925804187


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends 
AbstractResetIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+protected static final int TIMEOUT_MULTIPLIER = 15;
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientConfig = new Properties();
+commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
+streamsConfig = new Properties();
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
S

[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925804504


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends 
AbstractResetIntegrationTest {

Review Comment:
   +1; done.



##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka

[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925805004


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends 
AbstractResetIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+protected static final int TIMEOUT_MULTIPLIER = 15;
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientConfig = new Properties();
+commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
+streamsConfig = new Properties();
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
S

[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925808463


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends 
AbstractResetIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+protected static final int TIMEOUT_MULTIPLIER = 15;
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientConfig = new Properties();
+commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
+streamsConfig = new Properties();
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
S

[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925809931


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 2;

Review Comment:
   Ok, I think I figured it out
   
   There was a little non-determinism in the `close()` call.  
   
   The first approach I was using removed the consumers from the CG.  I *think* 
that if the StreamThreads were closed quickly enough, everything was fine.  If 
not, the StreamThreads re-subscribed to the CG and then we had to wait for the 
SESSION_TIMEOUT in order for the CG to be empty again.
   
   I've updated the `close(CloseOptions)` method to close the StreamThreads 
first.  This means that `close` has to get a new Admin in order to remove 
things from the CG.
   
   The only other option I could see would be pushing the work of leaving the 
CG to each StreamThread.  Maybe that's a better approach.  What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13700) Kafka reporting CorruptRecordException exception

2022-07-20 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569114#comment-17569114
 ] 

Divij Vaidya commented on KAFKA-13700:
--

Few questions to understand the situation better. Let's focus on a specific 
record.

1. Is this occurring for non-reserved topics too?
2. Is the same record corrupted across all replicas? (You can validate this by 
doing a leader failover and reading the offset)
2. Could you please provide the DumpLogSegments result for the segment which 
contains the corrupted offset? Please provide both the batch record information 
as well as record information (the latter is obtained by using --print-data-log 
argument)
3. Do you happen to know what producer produced the records? What was the 
snappy version used by producer and what is the snappy version used by the 
kafka server?
4. Please add stack trace for "[2022-02-28 22:17:00,235] ERROR [ReplicaFetcher 
replicaId=6, leaderId=1, fetcherId=0] Found invalid messages"

The interesting thing in your case is that crc check for the batch isvalid is 
true when using dumplogsegments (which means that data is correctly stored on 
disk) but it fails when using using fetch which points towards a bug in 
fetch/consumer logic.

> Kafka reporting CorruptRecordException exception
> 
>
> Key: KAFKA-13700
> URL: https://issues.apache.org/jira/browse/KAFKA-13700
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.1
> Environment: ubuntu 16.04
> kafka 2.4
>Reporter: Uday Bhaskar
>Priority: Critical
>
> In our kafka cluster a couple of partitions in __consumer_offsets and 1 
> regular topic getting data corruption issue while replicas trying to read 
> from leader.  Similar messages for other partitions as well . 
>  
> [2022-02-28 21:57:29,941] ERROR [ReplicaFetcher replicaId=6, leaderId=1, 
> fetcherId=2] Found invalid messages during fetch for partition 
> __consumer_offsets-10 offset 108845487 (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.CorruptRecordException: Record is corrupt 
> (stored crc = 1524235439) in topic partition __consumer_offsets-10
>  
> another topic partitions with same errors
> [2022-02-28 22:17:00,235] ERROR [ReplicaFetcher replicaId=6, leaderId=1, 
> fetcherId=0] Found invalid messages during fetch for partition 
> px-11351-xx-a56c642-0 offset 11746872 (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.CorruptRecordException: Record is corrupt 
> (stored crc = 475179617) in topic partition px-11351-xx-a56c642-0.
>  
> I have verified all infrastructure, dish network and system for any errors 
> found and nothing found. I am not sure why it is happening or how to 
> troubleshoot.  
>  
> Bellow is output of the message from DumpLogSegments , 
>  
> $ /opt/ns/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
> --verify-index-only --deep-iteration --files ./11324034.log | 
> grep 11746872
> baseOffset: 11746872 lastOffset: 11746872 count: 1 baseSequence: 50278 
> lastSequence: 50278 producerId: 17035 producerEpoch: 0 partitionLeaderEpoch: 
> 8 isTransactional: false isControl: false position: 252530345 CreateTime: 
> 1645886348240 size: 647 magic: 2 compresscodec: SNAPPY crc: 475179617 
> isvalid: true
> | offset: 11746872 CreateTime: 1645886348240 keysize: 54 valuesize: 637 
> sequence: 50278 headerKeys: []



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13700) Kafka reporting CorruptRecordException exception

2022-07-20 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569115#comment-17569115
 ] 

Divij Vaidya commented on KAFKA-13700:
--

cc: [~junrao] for visibility since he is answering another corruption issue in 
https://issues.apache.org/jira/browse/KAFKA-13953

> Kafka reporting CorruptRecordException exception
> 
>
> Key: KAFKA-13700
> URL: https://issues.apache.org/jira/browse/KAFKA-13700
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.1
> Environment: ubuntu 16.04
> kafka 2.4
>Reporter: Uday Bhaskar
>Priority: Critical
>
> In our kafka cluster a couple of partitions in __consumer_offsets and 1 
> regular topic getting data corruption issue while replicas trying to read 
> from leader.  Similar messages for other partitions as well . 
>  
> [2022-02-28 21:57:29,941] ERROR [ReplicaFetcher replicaId=6, leaderId=1, 
> fetcherId=2] Found invalid messages during fetch for partition 
> __consumer_offsets-10 offset 108845487 (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.CorruptRecordException: Record is corrupt 
> (stored crc = 1524235439) in topic partition __consumer_offsets-10
>  
> another topic partitions with same errors
> [2022-02-28 22:17:00,235] ERROR [ReplicaFetcher replicaId=6, leaderId=1, 
> fetcherId=0] Found invalid messages during fetch for partition 
> px-11351-xx-a56c642-0 offset 11746872 (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.CorruptRecordException: Record is corrupt 
> (stored crc = 475179617) in topic partition px-11351-xx-a56c642-0.
>  
> I have verified all infrastructure, dish network and system for any errors 
> found and nothing found. I am not sure why it is happening or how to 
> troubleshoot.  
>  
> Bellow is output of the message from DumpLogSegments , 
>  
> $ /opt/ns/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
> --verify-index-only --deep-iteration --files ./11324034.log | 
> grep 11746872
> baseOffset: 11746872 lastOffset: 11746872 count: 1 baseSequence: 50278 
> lastSequence: 50278 producerId: 17035 producerEpoch: 0 partitionLeaderEpoch: 
> 8 isTransactional: false isControl: false position: 252530345 CreateTime: 
> 1645886348240 size: 647 magic: 2 compresscodec: SNAPPY crc: 475179617 
> isvalid: true
> | offset: 11746872 CreateTime: 1645886348240 keysize: 54 valuesize: 637 
> sequence: 50278 headerKeys: []



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on pull request #12378: MINOR : lower Metadata info log to debug for topic ID change

2022-07-20 Thread GitBox


jolshan commented on PR #12378:
URL: https://github.com/apache/kafka/pull/12378#issuecomment-1190539986

   Ah looks like the issue is that the oldID is null (not the new ID).
   
   As described by this comment:
   
   ```  
   // If the new topic ID is valid and different from the last seen topic 
ID, update the metadata.
   // Between the time that a topic is deleted and re-created, the client 
may lose track of the
  // corresponding topicId (i.e. `oldTopicId` will be null). In this case, 
when we discover the new
  // topicId, we allow the corresponding leader epoch to override the last 
seen value.
   ```
   
   So based on this comment, we do want to override the epoch. Just curious why 
this one extra log message was causing issues. I suppose one thing we can do is 
check if this is the first metadata response/ if the epochs are equal, but not 
sure how much this helps.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925862925


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 2;

Review Comment:
   Oh. Good, find. I thought/expected that the thread were close first, not 
afterwards...
   
   But there is only one admin client that is shared over all threads, so why 
can't we keep admin open, send the "remove" request, and close admin afterwards 
(to avoid creating a new admin)?
   
   I am also fine if each thread removes only itself, after it called 
consumer.close().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925862925


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 2;

Review Comment:
   Oh. Good, find. I thought/expected that the thread were close first, not 
afterwards...
   
   But there is only one admin client that is shared over all threads, so why 
can't we keep admin open, send the "remove" request, and close admin afterwards 
(to avoid creating a new admin)? Guess not idea to "split" the logic and close 
admin outside of the centralized "private close()" method.
   
   I am also fine if each thread removes only itself, after it called 
consumer.close(). Might even be more elegant if it's not too clumsy to 
implement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12397: KAFKA-10199: Cleanup TaskManager and Task interfaces

2022-07-20 Thread GitBox


guozhangwang commented on code in PR #12397:
URL: https://github.com/apache/kafka/pull/12397#discussion_r925880702


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1307,9 +1307,9 @@ public Set characteristics() {
 }
 
 @SafeVarargs
-public static  Set union(final Supplier> constructor, final 
Set... set) {
+public static  Set union(final Supplier> constructor, final 
Collection... set) {
 final Set result = constructor.get();
-for (final Set s : set) {
+for (final Collection s : set) {

Review Comment:
   Good point, I also feel using `Set` is safer for de-dup effects. Will change 
it back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12397: KAFKA-10199: Cleanup TaskManager and Task interfaces

2022-07-20 Thread GitBox


guozhangwang commented on code in PR #12397:
URL: https://github.com/apache/kafka/pull/12397#discussion_r925881686


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java:
##
@@ -55,25 +57,28 @@ public abstract class AbstractTask implements Task {
 protected Map offsetSnapshotSinceLastFlush = null;
 
 protected final TaskId id;
+protected final TaskConfig config;
 protected final ProcessorTopology topology;
 protected final StateDirectory stateDirectory;
 protected final ProcessorStateManager stateMgr;
+
 private final long taskTimeoutMs;
 
 AbstractTask(final TaskId id,
  final ProcessorTopology topology,
  final StateDirectory stateDirectory,
  final ProcessorStateManager stateMgr,
  final Set inputPartitions,
- final long taskTimeoutMs,
+ final TaskConfig config,
  final String taskType,
  final Class clazz) {
 this.id = id;
 this.stateMgr = stateMgr;
 this.topology = topology;
+this.config = config;
 this.inputPartitions = inputPartitions;
 this.stateDirectory = stateDirectory;
-this.taskTimeoutMs = taskTimeoutMs;
+this.taskTimeoutMs = config.taskTimeoutMs;

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12397: KAFKA-10199: Cleanup TaskManager and Task interfaces

2022-07-20 Thread GitBox


guozhangwang commented on code in PR #12397:
URL: https://github.com/apache/kafka/pull/12397#discussion_r925882848


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -34,29 +33,40 @@
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.utils.Utils.filterMap;
+import static org.apache.kafka.common.utils.Utils.union;
+
+/**
+ * All tasks contained by the Streams instance.
+ *
+ * Note that these tasks are shared between the TaskManager (stream thread) 
and the StateUpdater (restore thread),
+ * i.e. all running active tasks are processed by the former and all restoring 
active tasks and standby tasks are
+ * processed by the latter.
+ */
 class Tasks {
 private final Logger log;
-private final TopologyMetadata topologyMetadata;
-
-private final Map allTasksPerId = 
Collections.synchronizedSortedMap(new TreeMap<>());
-private final Map readOnlyTasksPerId = 
Collections.unmodifiableMap(allTasksPerId);
-private final Collection readOnlyTasks = 
Collections.unmodifiableCollection(allTasksPerId.values());
 
 // TODO: change type to `StreamTask`
 private final Map activeTasksPerId = new TreeMap<>();
+// TODO: change type to `StandbyTask`
+private final Map standbyTasksPerId = new TreeMap<>();
+
+// Tasks may have been assigned for a NamedTopology that is not yet known 
by this host. When that occurs we stash
+// these unknown tasks until either the corresponding NamedTopology is 
added and we can create them at last, or
+// we receive a new assignment and they are revoked from the thread.
+
+// Tasks may have been assigned but not yet created because:
+// 1. They are for a NamedTopology that is yet known by this host.
+// 2. They are to be recycled from an existing restoring task yet to be 
returned from the task updater.

Review Comment:
   Oops, my bad :) thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12397: KAFKA-10199: Cleanup TaskManager and Task interfaces

2022-07-20 Thread GitBox


guozhangwang commented on code in PR #12397:
URL: https://github.com/apache/kafka/pull/12397#discussion_r925892759


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java:
##
@@ -234,6 +236,43 @@ public void closeCleanAndRecycleState() {
 log.info("Closed clean and recycled state");
 }
 
+/**
+ * Create an active task from this standby task without closing and 
re-initializing the state stores.
+ * The task should have been in suspended state when calling this function
+ *
+ * TODO: we should be able to not need the input partitions as input param 
in future but always reuse
+ *   the task's input partitions when we have fixed partitions -> 
tasks mapping
+ */
+public StreamTask recycle(final Time time,

Review Comment:
   I re-think about this a bit, and I felt just having `recycle` in the task is 
not sufficient as a refactoring, since we'd need to still call 1) 
task.closeCleanAndRecycleState, 2) taskCreator.createXTaskFromY(), which would 
call 3) task.recycle() in order.
   
   I think it's better to extract the creators out of the `Tasks` (then also 
extract the main consumer as well since it's only needed for the creation of 
the tasks) into `TaskManager`, and merge the above steps into a single 
function. But since this PR is getting complicated enough I'd probably defer 
this to another PR.
   
   I could either revert this function or just keep it here without the test 
coverage, and then further consolidate the logic in a follow-up PR, which one 
do you prefer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925894827


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 2;

Review Comment:
   Ok, I moved leaving the CG into the shutdown helper.  I think that'll do 
it...  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925895182


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1474,7 +1482,7 @@ private void closeToError() {
 if (!setState(State.PENDING_ERROR)) {
 log.info("Skipping shutdown since we are already in " + state());
 } else {
-final Thread shutdownThread = shutdownHelper(true);
+final Thread shutdownThread = shutdownHelper(true, Long.MAX_VALUE, 
false);

Review Comment:
   Of all the refactoring, this is probably the "scariest".  @mjsax thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12397: KAFKA-10199: Cleanup TaskManager and Task interfaces

2022-07-20 Thread GitBox


cadonna commented on code in PR #12397:
URL: https://github.com/apache/kafka/pull/12397#discussion_r925917621


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java:
##
@@ -234,6 +236,43 @@ public void closeCleanAndRecycleState() {
 log.info("Closed clean and recycled state");
 }
 
+/**
+ * Create an active task from this standby task without closing and 
re-initializing the state stores.
+ * The task should have been in suspended state when calling this function
+ *
+ * TODO: we should be able to not need the input partitions as input param 
in future but always reuse
+ *   the task's input partitions when we have fixed partitions -> 
tasks mapping
+ */
+public StreamTask recycle(final Time time,

Review Comment:
   Yeah, I was also wondering if we can simplify this chain of calls from 
`TaskManager` -> `Tasks` -> `StandbyTaskCreator`/`ActiveTaskCreator` -> 
`StreamTask`/`StandbyTask`. And I also thought it might be better to postpone 
this refactoring. 🙂  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12397: KAFKA-10199: Cleanup TaskManager and Task interfaces

2022-07-20 Thread GitBox


guozhangwang commented on code in PR #12397:
URL: https://github.com/apache/kafka/pull/12397#discussion_r925945110


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java:
##
@@ -216,8 +216,9 @@ long partitionTimestamp(final TopicPartition partition) {
 }
 
 // creates queues for new partitions, removes old queues, saves cached 
records for previously assigned partitions
-void updatePartitions(final Set newInputPartitions, final 
Function recordQueueCreator) {
+void updatePartitions(final Set inputPartitions, final 
Function recordQueueCreator) {

Review Comment:
   @cadonna This is a bug exposed from the unit test now: we should not modify 
the passed in params directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925952015


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1431,6 +1434,10 @@ private Thread shutdownHelper(final boolean error) {
 }
 
 private boolean close(final long timeoutMs) {

Review Comment:
   Do we still need this helper? Seems it's called only in a single place?



##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1521,43 +1529,35 @@ public synchronized boolean close(final CloseOptions 
options) throws IllegalArgu
 if (timeoutMs < 0) {
 throw new IllegalArgumentException("Timeout can't be negative.");
 }
+log.debug("Stopping Streams client with timeoutMillis = {} ms.", 
timeoutMs);
+return close(timeoutMs, options.leaveGroup);

Review Comment:
   Seems we are missing a non-null check for `options`? (Unrelated to this PR, 
but we should just fix it on the side -- should be the first line of code in 
this method.)



##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 1;
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUS

[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925960794


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1431,6 +1434,10 @@ private Thread shutdownHelper(final boolean error) {
 }
 
 private boolean close(final long timeoutMs) {

Review Comment:
   Good catch.  Removing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14090) Allow Kafka Streams to be configured to not create internal topics

2022-07-20 Thread Abraham Leal (Jira)
Abraham Leal created KAFKA-14090:


 Summary: Allow Kafka Streams to be configured to not create 
internal topics
 Key: KAFKA-14090
 URL: https://issues.apache.org/jira/browse/KAFKA-14090
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 3.2.0
Reporter: Abraham Leal


These should be a way to instruct Kafka Streams to not create internal topics 
on start-up through configuration and fail if the internal topics needed aren't 
there.

The reasoning for this option is in the case of governance for the application: 
An organization may wish to disallow the creation of topics by clients and opt 
for all topic creation to be done through administrators or a certain process. 
Injecting this property in all clients would ensure good central governance of 
the backing Kafka cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925966739


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1474,7 +1482,7 @@ private void closeToError() {
 if (!setState(State.PENDING_ERROR)) {
 log.info("Skipping shutdown since we are already in " + state());
 } else {
-final Thread shutdownThread = shutdownHelper(true);
+final Thread shutdownThread = shutdownHelper(true, Long.MAX_VALUE, 
false);

Review Comment:
   What `timeoutMs`?  
   
   There's no `timeoutMs` in scope.
   
   The method `closeToError` is called a few places; not sure if we could add a 
timeout to each of them...



##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1521,43 +1529,35 @@ public synchronized boolean close(final CloseOptions 
options) throws IllegalArgu
 if (timeoutMs < 0) {
 throw new IllegalArgumentException("Timeout can't be negative.");
 }
+log.debug("Stopping Streams client with timeoutMillis = {} ms.", 
timeoutMs);
+return close(timeoutMs, options.leaveGroup);

Review Comment:
   +1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925969588


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 1;
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientConfig = new Properties();
+commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
+streamsConfig = new Properties();
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+   

[GitHub] [kafka] rhauch commented on a diff in pull request #12424: RELEASE: Add 3.2 upgrade docs

2022-07-20 Thread GitBox


rhauch commented on code in PR #12424:
URL: https://github.com/apache/kafka/pull/12424#discussion_r925969756


##
docs/upgrade.html:
##
@@ -61,6 +61,22 @@ Upgrading to 
3.2.0 from any vers
 
 
 
+Notable changes in 
3.2.1
+
+A problem with OAuth token refresh was fixed. During token 
refresh, the Kafka client was not including
+the SASL extensions which were present during the initial 
authentication. This leads to authentication
+failures. See https://issues.apache.org/jira/browse/KAFKA-14062";>KAFKA-14062 for 
details.
+A problem in Connect with error.tolerance was fixed. 
Starting in 3.2.0, when

Review Comment:
   ```suggestion
   A problem in Connect with errors.tolerance was 
fixed. Starting in 3.2.0, when
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925971253


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 1;
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientConfig = new Properties();
+commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
+streamsConfig = new Properties();
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+   

[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925972749


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends 
AbstractResetIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+protected static final int TIMEOUT_MULTIPLIER = 15;
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientConfig = new Properties();
+commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
+streamsConfig = new Properties();
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
S

[GitHub] [kafka] mumrah merged pull request #12424: RELEASE: Add 3.2 upgrade docs

2022-07-20 Thread GitBox


mumrah merged PR #12424:
URL: https://github.com/apache/kafka/pull/12424


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13897) Add 3.1.1 to system tests and streams upgrade tests

2022-07-20 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-13897:
-
Fix Version/s: (was: 3.2.1)

> Add 3.1.1 to system tests and streams upgrade tests
> ---
>
> Key: KAFKA-13897
> URL: https://issues.apache.org/jira/browse/KAFKA-13897
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Tom Bentley
>Priority: Blocker
> Fix For: 3.3.0, 3.1.2
>
>
> Per the penultimate bullet on the [release 
> checklist|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses],
>  Kafka v3.1.1 is released. We should add this version to the system tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14091) Suddenly-killed tasks can leave hanging transactions open

2022-07-20 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14091:
-

 Summary: Suddenly-killed tasks can leave hanging transactions open
 Key: KAFKA-14091
 URL: https://issues.apache.org/jira/browse/KAFKA-14091
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Chris Egerton


Right now, if a task running with exactly-once support is killed ungracefully, 
it may leave a hanging transaction open. If the transaction included writes to 
the offsets topic, then startup for future workers becomes blocked on that 
transaction expiring.

Ideally, we could identify these kinds of hanging transactions and proactively 
abort them.

Unfortunately, there are a few facts that make this fairly complicated:
 # Workers read to the end of the offsets topic during startup, before joining 
the cluster
 # Workers do not know which tasks they are assigned until they join the cluster

The result of these facts is that we cannot trust workers that are restarted 
shortly after being ungracefully shut down to fence out their own hanging 
transactions, since any hanging transactions would prevent them from being able 
to join the group and receive their task assignment in the first place.

We could possibly accomplish this by having the leader proactively abort any 
open transactions for tasks on workers that appear to have left the cluster 
during a rebalance. This would not require us to wait for the scheduled 
rebalance delay to elapse, since the intent of the delay is to provide a buffer 
between when workers leave and when their connectors/tasks are reallocated 
across the cluster (and, if the worker is able to rejoin before that buffer is 
consumed, then give it back the same connectors/tasks it was running 
previously); aborting transactions for tasks on these workers would not 
interfere with that goal.

 

It's also possible that we may have to handle the case where a 
[cancelled|https://github.com/apache/kafka/blob/badfbacdd09a9ee8821847f4b28d98625f354ed7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L274-L287]
 task leaves a transaction open; I have yet to confirm whether this is 
possible, though.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #11783: KAFKA-10000: System tests (KIP-618)

2022-07-20 Thread GitBox


C0urante commented on PR #11783:
URL: https://github.com/apache/kafka/pull/11783#issuecomment-1190816405

   Ah, thanks for the update Luke! In the meantime, I've discovered and 
addressed a few more issues that surfaced during my local runs yesterday:
   
   Since follower workers don't retry when zombie fencing requests to the 
leader fail (which is intentional, as we want to be able to surface failures 
caused by things like insufficient ACLs to perform a round of fencing), it's 
possible that a task that's hosted on a follower may fail during startup if the 
leader has just been bounced the worker process hasn't started yet. I've added 
a small part to restart any failed tasks after all the bounces have completed 
and before we check to make sure that the connector and its tasks are healthy.
   
   Since the REST API is available before workers have actually completed 
startup, it's also possible that requests to fence zombies (and submit task 
configs) can be made to the leader before it has been able to read a session 
key from the config topic. I've tweaked the herder logic to catch this case and 
throw a 503 error with a user-friendly error message. I experimented with some 
other approaches to automatically refresh the leader's view of the config topic 
in this case, and/or handle request signature validation on the herder's tick 
thread (which would ensure that the worker had been able to complete startup 
and read to the current end of the config topic), but the additional complexity 
incurred by these options didn't seem worth the benefits since they would still 
be incomplete for cases like the one described above.
   
   It's also possible that, when hard-bouncing a worker, a transaction opened 
by one of its tasks gets left hanging. If the task has begun to write offsets, 
then startup for subsequent workers will be blocked on the expiration of that 
transaction, which by default takes 60 seconds. This can cause test failures 
because we usually wait for 60 seconds for workers to complete startup. To 
address this, I've lowered the transaction timeout to 10 seconds. Ideally, we 
could proactively abort any open transactions left behind by prior task 
generations during zombie fencing, but it's probably too late to add this kind 
of logic in time for the 3.3.0 release. I've filed 
https://issues.apache.org/jira/browse/KAFKA-14091 to track this.
   
   There's also a possible NPE in `KafkaBasedLog` caused by yet another unsafe 
use of `Utils::closeQuietly`. It's not a major issue since it only occurs when 
the log is shut down before it has had a chance to start, but it's still worth 
patching.
   
   I've kicked off another local run of `test_exactly_once_source` with unclean 
shutdown and the `sessioned` protocol after applying these changes. I've only 
completed five tests so far, but they've all succeeded. Will report the results 
after the other ninety-five runs have completed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14089) Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic

2022-07-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-14089:
-

Assignee: Chris Egerton

> Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
> ---
>
> Key: KAFKA-14089
> URL: https://issues.apache.org/jira/browse/KAFKA-14089
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Attachments: failure.txt
>
>
> It looks like the sequence got broken around "65535, 65537, 65536, 65539, 
> 65538, 65541, 65540, 65543"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14089) Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic

2022-07-20 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569204#comment-17569204
 ] 

Chris Egerton commented on KAFKA-14089:
---

Thanks [~mimaison]. We don't assert on order of records, just that the expected 
seqnos were present in any order, so the wonkiness around 65535 isn't actually 
an issue (and it's even present in the stringified representation of both the 
expected _and_ the actual seqno sets).

 

After doing some Bash scrubbing on the file attached to the ticket, it looks 
like seqnos start to be missing (i.e., they're in the expected set but not the 
actual) between 114463 and 114754. Not every seqno in that range is missing, 
but there's 105 in total. After that, starting at 114755, there's 105 extra 
(i.e., in the actual set but not the expected) seqnos.

 

Given that the issues crop up at the very end of the seqno set, it seems like 
this could be caused by non-graceful shutdown of the worker after exactly-once 
support is disabled, or even possibly the recently-discovered KAFKA-14079. It's 
a little worrisome, though, since the results here indicate possible data loss.

 

If this was on Jenkins, do you have a link to the CI run that caused it? Or if 
it was encountered elsewhere, do you have any logs available? I'll try to kick 
off some local runs but I'm in the middle of stress-testing my laptop with the 
latest KIP-618 system tests and may not be able to reproduce locally.

> Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
> ---
>
> Key: KAFKA-14089
> URL: https://issues.apache.org/jira/browse/KAFKA-14089
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Attachments: failure.txt
>
>
> It looks like the sequence got broken around "65535, 65537, 65536, 65539, 
> 65538, 65541, 65540, 65543"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14089) Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic

2022-07-20 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569204#comment-17569204
 ] 

Chris Egerton edited comment on KAFKA-14089 at 7/20/22 11:28 PM:
-

Thanks [~mimaison]. We don't assert on order of records, just that the expected 
seqnos were present in any order, so the wonkiness around 65535 isn't actually 
an issue (and it's even present in the stringified representation of both the 
expected _and_ the actual seqno sets).

 

After doing some Bash scrubbing on the file attached to the ticket, it looks 
like seqnos start to be missing (i.e., they're in the expected set but not the 
actual) between 114463 and 114754. Not every seqno in that range is missing, 
but there's 105 missing in total. After that, starting at 114755, there's 105 
extra (i.e., in the actual set but not the expected) seqnos.

 

Given that the issues crop up at the very end of the seqno set, it seems like 
this could be caused by non-graceful shutdown of the worker after exactly-once 
support is disabled, or even possibly the recently-discovered KAFKA-14079. It's 
a little worrisome, though, since the results here indicate possible data loss.

 

If this was on Jenkins, do you have a link to the CI run that caused it? Or if 
it was encountered elsewhere, do you have any logs available? I'll try to kick 
off some local runs but I'm in the middle of stress-testing my laptop with the 
latest KIP-618 system tests and may not be able to reproduce locally.


was (Author: chrisegerton):
Thanks [~mimaison]. We don't assert on order of records, just that the expected 
seqnos were present in any order, so the wonkiness around 65535 isn't actually 
an issue (and it's even present in the stringified representation of both the 
expected _and_ the actual seqno sets).

 

After doing some Bash scrubbing on the file attached to the ticket, it looks 
like seqnos start to be missing (i.e., they're in the expected set but not the 
actual) between 114463 and 114754. Not every seqno in that range is missing, 
but there's 105 in total. After that, starting at 114755, there's 105 extra 
(i.e., in the actual set but not the expected) seqnos.

 

Given that the issues crop up at the very end of the seqno set, it seems like 
this could be caused by non-graceful shutdown of the worker after exactly-once 
support is disabled, or even possibly the recently-discovered KAFKA-14079. It's 
a little worrisome, though, since the results here indicate possible data loss.

 

If this was on Jenkins, do you have a link to the CI run that caused it? Or if 
it was encountered elsewhere, do you have any logs available? I'll try to kick 
off some local runs but I'm in the middle of stress-testing my laptop with the 
latest KIP-618 system tests and may not be able to reproduce locally.

> Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
> ---
>
> Key: KAFKA-14089
> URL: https://issues.apache.org/jira/browse/KAFKA-14089
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Attachments: failure.txt
>
>
> It looks like the sequence got broken around "65535, 65537, 65536, 65539, 
> 65538, 65541, 65540, 65543"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14089) Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic

2022-07-20 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569204#comment-17569204
 ] 

Chris Egerton edited comment on KAFKA-14089 at 7/20/22 11:41 PM:
-

Thanks [~mimaison]. We don't assert on order of records, just that the expected 
seqnos were present in any order, so the wonkiness around 65535 isn't actually 
an issue (and it's even present in the stringified representation of both the 
expected _and_ the actual seqno sets).

After doing some Bash scrubbing on the file attached to the ticket, it looks 
like seqnos start to be missing (i.e., they're in the expected set but not the 
actual) between 114463 and 114754. Not every seqno in that range is missing, 
but there's 105 missing in total. After that, starting at 114755, there's 105 
extra (i.e., in the actual set but not the expected) seqnos.

Given that the issues crop up at the very end of the seqno set, it seems like 
this could be caused by non-graceful shutdown of the worker after exactly-once 
support is disabled, or even possibly the recently-discovered KAFKA-14079. 
-It's a little worrisome, though, since the results here indicate possible data 
loss.- Actually, on second thought, this is probably not data loss, since we're 
reading the records that have been produced to Kafka, but not necessarily the 
records whose offsets have been committed.

If this was on Jenkins, do you have a link to the CI run that caused it? Or if 
it was encountered elsewhere, do you have any logs available? I'll try to kick 
off some local runs but I'm in the middle of stress-testing my laptop with the 
latest KIP-618 system tests and may not be able to reproduce locally.

I suspect a fix for this would involve reading the last-committed offset for 
each task, then only checking seqnos for that task up to the seqno in that 
offset. But I'd like to have a better idea of what exactly is causing the 
failure before pulling the trigger on that, especially if it's unclean 
task/worker shutdown and we can find a way to fix that instead of adjusting our 
tests to handle sloppy shutdowns.


was (Author: chrisegerton):
Thanks [~mimaison]. We don't assert on order of records, just that the expected 
seqnos were present in any order, so the wonkiness around 65535 isn't actually 
an issue (and it's even present in the stringified representation of both the 
expected _and_ the actual seqno sets).

 

After doing some Bash scrubbing on the file attached to the ticket, it looks 
like seqnos start to be missing (i.e., they're in the expected set but not the 
actual) between 114463 and 114754. Not every seqno in that range is missing, 
but there's 105 missing in total. After that, starting at 114755, there's 105 
extra (i.e., in the actual set but not the expected) seqnos.

 

Given that the issues crop up at the very end of the seqno set, it seems like 
this could be caused by non-graceful shutdown of the worker after exactly-once 
support is disabled, or even possibly the recently-discovered KAFKA-14079. It's 
a little worrisome, though, since the results here indicate possible data loss.

 

If this was on Jenkins, do you have a link to the CI run that caused it? Or if 
it was encountered elsewhere, do you have any logs available? I'll try to kick 
off some local runs but I'm in the middle of stress-testing my laptop with the 
latest KIP-618 system tests and may not be able to reproduce locally.

> Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
> ---
>
> Key: KAFKA-14089
> URL: https://issues.apache.org/jira/browse/KAFKA-14089
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Attachments: failure.txt
>
>
> It looks like the sequence got broken around "65535, 65537, 65536, 65539, 
> 65538, 65541, 65540, 65543"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito

2022-07-20 Thread GitBox


C0urante commented on code in PR #12422:
URL: https://github.com/apache/kafka/pull/12422#discussion_r926146499


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java:
##
@@ -53,64 +50,72 @@ public class WorkerConfigTransformerTest {
 public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL";
 public static final String TEST_RESULT_WITH_LONGER_TTL = 
"testResultWithLongerTTL";
 
-@Mock private Herder herder;
-@Mock private Worker worker;
-@Mock private HerderRequest requestId;
+private final Herder herder = Mockito.mock(Herder.class);
+private final Worker worker = Mockito.mock(Worker.class);
+private final HerderRequest requestId = Mockito.mock(HerderRequest.class);

Review Comment:
   It's pretty straightforward to replace `@RunWith(MockitoJUnitRunner.class)` 
with `@ExtendWith(MockitoExtension.class)`, and we're already using that 
approach in other modules (such as 
[here](https://github.com/apache/kafka/blob/badfbacdd09a9ee8821847f4b28d98625f354ed7/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java#L36)).
 We also get to leverage things like the [`@MockitoSettings` 
annotation](https://www.javadoc.io/doc/org.mockito/mockito-junit-jupiter/2.19.1/org/mockito/junit/jupiter/MockitoSettings.html)
 if we stick to this style.
   
   @ijuma since it's only going to be a one-line (well, excluding imports) 
change per test, do you think we could stick with `@RunWith` for now in our 
Mockito tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-20 Thread GitBox


splett2 commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r926147113


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,98 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.

Review Comment:
   This comment is written largely in the context of the JIRA ticket. I think 
we can write it more in the context of the current code, eg: replace references 
to `300 ms` with references to `poll timeout` so that if the poll timeout is 
changed, it is still consistent.



##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,98 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+shutdownServerAndMetrics(server)
+
+// create server with SSL listener
+val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+testableServer.enableRequestProcessing(Map.empty)
+val testableSelector = testableServer.testableSelector
+val proxyServer = new ProxyServer(testableServer)
+val selectTimeout = 5000  // in ms
+// set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is 
distinct and can be identified
+testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+try {
+  // trigger SSL handshake by sending the first request and receiving its 
response without buffering
+  val requestBytes = producerRequestBytes()
+  val sslSocket = sslClientSocket(proxyServer.localPort)
+
+  sendRequest(sslSocket, requestBytes)
+  val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+  processRequest(testableServer.dataPlaneRequestChannel, request1)
+  receiveResponse(sslSocket)
+
+  // then put 2 requests in SslTransportLayer.netReadBuffer via the 
ProxyServer
+  val connectionId = request1.context.connectionId
+  val listener = 
testableServer.config.dataPlaneListeners.head.listenerName.value
+  val channel = 
testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw
 new IllegalStateException("Channel not found"))
+  val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, 
classOf[KafkaChannel], "transportLayer")
+  val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, 
clas

[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r926113246


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1516,48 +1520,41 @@ public synchronized boolean close(final Duration 
timeout) throws IllegalArgument
  * @throws IllegalArgumentException if {@code timeout} can't be 
represented as {@code long milliseconds}
  */
 public synchronized boolean close(final CloseOptions options) throws 
IllegalArgumentException {
+Objects.requireNonNull(options);

Review Comment:
   ```suggestion
   Objects.requireNonNull(options, "options cannot be null");
   ```



##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 1;
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientCon

[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r926113971


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 1;
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientConfig = new Properties();
+commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
+streamsConfig = new Properties();
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+streamsC

[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-20 Thread GitBox


aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r926201391


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -1299,17 +1300,79 @@ public void 
testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
 }
 }
 
+@Test
+public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+try (ConsumerCoordinator coordinator = 
prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+int generationId = 42;
+String memberId = "consumer-42";
+
+Timer pollTimer = time.timer(100L);
+client.prepareResponse(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
+assertFalse(res);
+
+pollTimer = time.timer(100L);
+client.prepareResponse(offsetCommitResponse(singletonMap(t1p, 
Errors.NONE)));
+res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+assertTrue(res);
+
+assertFalse(client.hasPendingResponses());
+assertFalse(client.hasInFlightRequests());
+assertFalse(coordinator.coordinatorUnknown());
+}
+}
+
+@Test
+public void 
testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterNonRetryableException() {
+try (ConsumerCoordinator coordinator = 
prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+int generationId = 42;
+String memberId = "consumer-42";
+
+Timer pollTimer = time.timer(100L);
+client.prepareResponse(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_MEMBER_ID)));
+boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
+assertTrue(res);
+
+assertFalse(client.hasPendingResponses());
+assertFalse(client.hasInFlightRequests());
+assertFalse(coordinator.coordinatorUnknown());
+}
+}
+
+@Test
+public void 
testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterRebalanceTimeout() {
+try (ConsumerCoordinator coordinator = 
prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+int generationId = 42;
+String memberId = "consumer-42";
+
+Timer pollTimer = time.timer(100L);
+time.sleep(150);

Review Comment:
   okay~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #12294: KAFKA-13990: KRaft controller should return right features in ApiVersionResponse

2022-07-20 Thread GitBox


dengziming commented on PR #12294:
URL: https://github.com/apache/kafka/pull/12294#issuecomment-1190983840

   cc @mumrah 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r926216992


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 1;
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientConfig = new Properties();
+commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
+streamsConfig = new Properties();
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+   

[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r926217527


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends 
AbstractResetIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+protected static final int TIMEOUT_MULTIPLIER = 15;
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientConfig = new Properties();
+commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
+streamsConfig = new Properties();
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
S

[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread GitBox


mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r926285132


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.server.KafkaConfig;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+brokerProps.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp(), 
Integer.toString(Integer.MAX_VALUE));
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientConfig = new Properties();
+commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
+streamsConfig = new Properties();
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+strea

[jira] [Assigned] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-14076:
---

Assignee: Jim Hughes

> Fix issues with KafkaStreams.CloseOptions
> -
>
> Key: KAFKA-14076
> URL: https://issues.apache.org/jira/browse/KAFKA-14076
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
>
> The new `close(CloseOptions)` function has a few bugs.  
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)]
> Notably, it needs to remove CGs per StreamThread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14076:

Fix Version/s: 3.3.0

> Fix issues with KafkaStreams.CloseOptions
> -
>
> Key: KAFKA-14076
> URL: https://issues.apache.org/jira/browse/KAFKA-14076
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
> Fix For: 3.3.0
>
>
> The new `close(CloseOptions)` function has a few bugs.  
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)]
> Notably, it needs to remove CGs per StreamThread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14076:

Priority: Blocker  (was: Major)

> Fix issues with KafkaStreams.CloseOptions
> -
>
> Key: KAFKA-14076
> URL: https://issues.apache.org/jira/browse/KAFKA-14076
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Blocker
> Fix For: 3.3.0
>
>
> The new `close(CloseOptions)` function has a few bugs.  
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)]
> Notably, it needs to remove CGs per StreamThread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions

2022-07-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569282#comment-17569282
 ] 

Matthias J. Sax commented on KAFKA-14076:
-

Marking this as blocker for 3.3, because this fixes KIP-812 which has a broken 
implementation right now. We need to either get this merger or revert the KIP.

> Fix issues with KafkaStreams.CloseOptions
> -
>
> Key: KAFKA-14076
> URL: https://issues.apache.org/jira/browse/KAFKA-14076
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Blocker
> Fix For: 3.3.0
>
>
> The new `close(CloseOptions)` function has a few bugs.  
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)]
> Notably, it needs to remove CGs per StreamThread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-07-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569283#comment-17569283
 ] 

Matthias J. Sax commented on KAFKA-13217:
-

Just reviewed the PR and linked the tickets. PR LGTM, so we can merge before we 
cut the first RC and ship the KIP.

> Reconsider skipping the LeaveGroup on close() or add an overload that does so
> -
>
> Key: KAFKA-13217
> URL: https://issues.apache.org/jira/browse/KAFKA-13217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sayantanu Dey
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
> Fix For: 3.3.0
>
>
> In Kafka Streams, when an instance is shut down via the close() API, we 
> intentionally skip sending a LeaveGroup request. This is because often the 
> shutdown is not due to a scaling down event but instead some transient 
> closure, such as during a rolling bounce. In cases where the instance is 
> expected to start up again shortly after, we originally wanted to avoid that 
> member's tasks from being redistributed across the remaining group members 
> since this would disturb the stable assignment and could cause unnecessary 
> state migration and restoration. We also hoped
> to limit the disruption to just a single rebalance, rather than forcing the 
> group to rebalance once when the member shuts down and then again when it 
> comes back up. So it's really an optimization  for the case in which the 
> shutdown is temporary.
>  
> That said, many of those optimizations are no longer necessary or at least 
> much less useful given recent features and improvements. For example 
> rebalances are now lightweight so skipping the 2nd rebalance is not as worth 
> optimizing for, and the new assignor will take into account the actual 
> underlying state for each task/partition assignment, rather than just the 
> previous assignment, so the assignment should be considerably more stable 
> across bounces and rolling restarts. 
>  
> Given that, it might be time to reconsider this optimization. Alternatively, 
> we could introduce another form of the close() API that forces the member to 
> leave the group, to be used in event of actual scale down rather than a 
> transient bounce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-07-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13217:

Description: 
In Kafka Streams, when an instance is shut down via the close() API, we 
intentionally skip sending a LeaveGroup request. This is because often the 
shutdown is not due to a scaling down event but instead some transient closure, 
such as during a rolling bounce. In cases where the instance is expected to 
start up again shortly after, we originally wanted to avoid that member's tasks 
from being redistributed across the remaining group members since this would 
disturb the stable assignment and could cause unnecessary state migration and 
restoration. We also hoped
to limit the disruption to just a single rebalance, rather than forcing the 
group to rebalance once when the member shuts down and then again when it comes 
back up. So it's really an optimization  for the case in which the shutdown is 
temporary.
 
That said, many of those optimizations are no longer necessary or at least much 
less useful given recent features and improvements. For example rebalances are 
now lightweight so skipping the 2nd rebalance is not as worth optimizing for, 
and the new assignor will take into account the actual underlying state for 
each task/partition assignment, rather than just the previous assignment, so 
the assignment should be considerably more stable across bounces and rolling 
restarts. 
 
Given that, it might be time to reconsider this optimization. Alternatively, we 
could introduce another form of the close() API that forces the member to leave 
the group, to be used in event of actual scale down rather than a transient 
bounce.

KIP-812: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group]

  was:
In Kafka Streams, when an instance is shut down via the close() API, we 
intentionally skip sending a LeaveGroup request. This is because often the 
shutdown is not due to a scaling down event but instead some transient closure, 
such as during a rolling bounce. In cases where the instance is expected to 
start up again shortly after, we originally wanted to avoid that member's tasks 
from being redistributed across the remaining group members since this would 
disturb the stable assignment and could cause unnecessary state migration and 
restoration. We also hoped
to limit the disruption to just a single rebalance, rather than forcing the 
group to rebalance once when the member shuts down and then again when it comes 
back up. So it's really an optimization  for the case in which the shutdown is 
temporary.
 
That said, many of those optimizations are no longer necessary or at least much 
less useful given recent features and improvements. For example rebalances are 
now lightweight so skipping the 2nd rebalance is not as worth optimizing for, 
and the new assignor will take into account the actual underlying state for 
each task/partition assignment, rather than just the previous assignment, so 
the assignment should be considerably more stable across bounces and rolling 
restarts. 
 
Given that, it might be time to reconsider this optimization. Alternatively, we 
could introduce another form of the close() API that forces the member to leave 
the group, to be used in event of actual scale down rather than a transient 
bounce.


> Reconsider skipping the LeaveGroup on close() or add an overload that does so
> -
>
> Key: KAFKA-13217
> URL: https://issues.apache.org/jira/browse/KAFKA-13217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sayantanu Dey
>Priority: Major
>  Labels: kip, newbie, newbie++
> Fix For: 3.3.0
>
>
> In Kafka Streams, when an instance is shut down via the close() API, we 
> intentionally skip sending a LeaveGroup request. This is because often the 
> shutdown is not due to a scaling down event but instead some transient 
> closure, such as during a rolling bounce. In cases where the instance is 
> expected to start up again shortly after, we originally wanted to avoid that 
> member's tasks from being redistributed across the remaining group members 
> since this would disturb the stable assignment and could cause unnecessary 
> state migration and restoration. We also hoped
> to limit the disruption to just a single rebalance, rather than forcing the 
> group to rebalance once when the member shuts down and then again when it 
> comes back up. So it's really an optimization  for the case in which the 
> shutdown is temporary.
>  
> That said, many of those optimizations are no longer necess

[jira] [Updated] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-07-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13217:

Labels: kip newbie newbie++  (was: needs-kip newbie newbie++)

> Reconsider skipping the LeaveGroup on close() or add an overload that does so
> -
>
> Key: KAFKA-13217
> URL: https://issues.apache.org/jira/browse/KAFKA-13217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sayantanu Dey
>Priority: Major
>  Labels: kip, newbie, newbie++
> Fix For: 3.3.0
>
>
> In Kafka Streams, when an instance is shut down via the close() API, we 
> intentionally skip sending a LeaveGroup request. This is because often the 
> shutdown is not due to a scaling down event but instead some transient 
> closure, such as during a rolling bounce. In cases where the instance is 
> expected to start up again shortly after, we originally wanted to avoid that 
> member's tasks from being redistributed across the remaining group members 
> since this would disturb the stable assignment and could cause unnecessary 
> state migration and restoration. We also hoped
> to limit the disruption to just a single rebalance, rather than forcing the 
> group to rebalance once when the member shuts down and then again when it 
> comes back up. So it's really an optimization  for the case in which the 
> shutdown is temporary.
>  
> That said, many of those optimizations are no longer necessary or at least 
> much less useful given recent features and improvements. For example 
> rebalances are now lightweight so skipping the 2nd rebalance is not as worth 
> optimizing for, and the new assignor will take into account the actual 
> underlying state for each task/partition assignment, rather than just the 
> previous assignment, so the assignment should be considerably more stable 
> across bounces and rolling restarts. 
>  
> Given that, it might be time to reconsider this optimization. Alternatively, 
> we could introduce another form of the close() API that forces the member to 
> leave the group, to be used in event of actual scale down rather than a 
> transient bounce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14092) Unsafe memory access operation exception leading to Error processing append operation on partition

2022-07-20 Thread swathi mocharla (Jira)
swathi mocharla created KAFKA-14092:
---

 Summary: Unsafe memory access operation exception leading to Error 
processing append operation on partition
 Key: KAFKA-14092
 URL: https://issues.apache.org/jira/browse/KAFKA-14092
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.0.0
 Environment: K8S
Reporter: swathi mocharla


Hi,

We are frequently seeing "Unsafe memory access operation" exception leading to 
"Error processing append operation on partition" on a certain partition.

Here are some logs:
{code:java}
{"process":"kafka.Kafka","service":"eventqueue","host":"hubudgen10-ncs-1-workerbm-2","container":"7efcf75afb20","system":"vtas250","neid":"123456","timezone":"UTC","type":"log","level":"info","time":"2022-07-20T11:15:10.62400Z","log":{"msg":"[ProducerStateManager
 partition=cdr-group-000-ccf-000-mgmt-0] Wrote producer snapshot at offset 3114 
with 0 producer ids in 47 ms.","class":"kafka.log.ProducerStateManager"}}
{"process":"kafka.Kafka","service":"eventqueue","host":"hubudgen10-ncs-1-workerbm-2","container":"7efcf75afb20","system":"vtas250","neid":"123456","timezone":"UTC","type":"log","level":"error","time":"2022-07-20T11:15:10.62800Z","log":{"msg":"[ReplicaManager
 broker=1] Error processing append operation on partition 
cdr-group-000-ccf-000-mgmt-0java.lang.InternalError: a fault occurred in a 
recent unsafe memory access operation in compiled Java 
code\njava.io.UnixFileSystem.getBooleanAttributes0(Native 
Method)\njava.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242)\njava.io.File.exists(File.java:830)\nkafka.log.TransactionIndex.(TransactionIndex.scala:50)\nkafka.log.LogSegment$.open(LogSegment.scala:663)\nkafka.log.Log.$anonfun$roll$2(Log.scala:1692)\nkafka.log.Log.roll(Log.scala:2487)\nkafka.log.Log.maybeRoll(Log.scala:1632)\nkafka.log.Log.append(Log.scala:900)\nkafka.log.Log.appendAsLeader(Log.scala:741)\nkafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1042)\nkafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1030)\nkafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:931)\nscala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)\nscala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)\nscala.collection.mutable.HashMap.map(HashMap.scala:35)\nkafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:919)\nkafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:591)\nkafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:658)\nkafka.server.KafkaApis.handle(KafkaApis.scala:169)\nkafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)\njava.lang.Thread.run(Thread.java:750)","class":"kafka.server.ReplicaManager"}}{code}
{code:java}
{"process":"kafka.Kafka","service":"eventqueue","host":"hubudgen10-ncs-1-workerbm-2","container":"7efcf75afb20","system":"vtas250","neid":"123456","timezone":"UTC","type":"log","level":"warning","time":"2022-07-20T11:15:11.63200Z","log":{"msg":"[Log
 partition=cdr-group-000-ccf-000-mgmt-0, dir=/data] Newly rolled segment file 
/data/cdr-group-000-ccf-000-mgmt-0/3114.log already exists; 
deleting it first","class":"kafka.log.Log"}}
{"process":"kafka.Kafka","service":"eventqueue","host":"hubudgen10-ncs-1-workerbm-2","container":"7efcf75afb20","system":"vtas250","neid":"123456","timezone":"UTC","type":"log","level":"error","time":"2022-07-20T11:15:11.63300Z","log":{"msg":"[ReplicaManager
 broker=1] Error processing append operation on partition 
cdr-group-000-ccf-000-mgmt-0java.lang.IllegalStateException: Attempt to append 
a timestamp (1658315708834) to slot 1 no larger than the last timestamp 
appended (8694993961132949504) to 
/data/cdr-group-000-ccf-000-mgmt-0/3108.timeindex.\nkafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:128)\nkafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)\nkafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:503)\nkafka.log.Log.$anonfun$roll$8(Log.scala:1681)\nkafka.log.Log.$anonfun$roll$8$adapted(Log.scala:1681)\nscala.Option.foreach(Option.scala:437)\nkafka.log.Log.$anonfun$roll$2(Log.scala:1681)\nkafka.log.Log.roll(Log.scala:2487)\nkafka.log.Log.maybeRoll(Log.scala:1632)\nkafka.log.Log.append(Log.scala:900)\nkafka.log.Log.appendAsLeader(Log.scala:741)\nkafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1042)\nkafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1030)\nkafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:931)\nscala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)\nscala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)\nscala.collection.mutable.HashMap.map(HashMap.scala:35)\nkafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:919)\nkafka.server.ReplicaManager.appendRecords(Rep