[GitHub] [kafka] omkreddy merged pull request #12651: KAFKA-14212: Enhanced HttpAccessTokenRetriever to retrieve error mess…

2022-09-20 Thread GitBox


omkreddy merged PR #12651:
URL: https://github.com/apache/kafka/pull/12651


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14212) Fetch error response when hitting public OAuth/OIDC provider

2022-09-20 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-14212.
---
Fix Version/s: 3.4.0
   Resolution: Fixed

> Fetch error response when hitting public OAuth/OIDC provider
> 
>
> Key: KAFKA-14212
> URL: https://issues.apache.org/jira/browse/KAFKA-14212
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Sushant Mahajan
>Assignee: Sushant Mahajan
>Priority: Minor
> Fix For: 3.4.0
>
>
> The class 
> org.apache.kafka.common.security.oauthbearer.secured.HttpAccessTokenRetriever 
> is used to send client creds to public OAuth/OIDC provider and fetch the 
> response, possibly including the access token.
> However, if there is an error - the exact error message from the provider is 
> not currently being retrieved.
> The error message can help the client easily diagnose if failure to fetch 
> token is due to some misconfiguration on their side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2022-09-20 Thread GitBox


dajac commented on PR #12590:
URL: https://github.com/apache/kafka/pull/12590#issuecomment-1252020391

   Perhaps a naive question but does the fetch request to close the session 
fetches any records? Or does it just close the session and return?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

2022-09-20 Thread GitBox


yashmayya commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r974127639


##
connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java:
##
@@ -24,20 +24,31 @@
 
 /**
  * Single message transformation for Kafka Connect record types.
- *
+ * 
  * Connectors can be configured with transformations to make lightweight 
message-at-a-time modifications.
  */
 public interface Transformation> extends 
Configurable, Closeable {
 
+String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
+String FIELD_SYNTAX_VERSION_DOC = "Defines the version of the syntax to 
access fields. "
++ "If set to `V1`, then the field paths are limited to access the 
elements at the root level of the struct or map."
++ "If set to `V2`, the syntax will support accessing nested elements. 
o access nested elements, "
++ "dotted notation is used. If dots are already included in the field 
name, then backtick pairs "
++ "can be used to wrap field names containing dots. "
++ "e.g. to access elements from a struct/map named \"foo.bar\", "
++ "the following format can be used to access its elements: 
\"`foo.bar`.baz\".";

Review Comment:
   I think this should be `from a field in a struct/map named ...`  instead?



##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java:
##
@@ -39,33 +41,35 @@
 private static final String FIELD_CONFIG = "field";
 
 public static final ConfigDef CONFIG_DEF = new ConfigDef()
-.define(FIELD_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to 
extract.");
+.define(FIELD_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to 
extract.")
+.define(FIELD_SYNTAX_VERSION_CONFIG, ConfigDef.Type.STRING, 
FIELD_SYNTAX_VERSION_DEFAULT_VALUE, ConfigDef.Importance.HIGH, 
FIELD_SYNTAX_VERSION_DOC);

Review Comment:
   Could we add a validator here to ensure that `FIELD_SYNTAX_VERSION_CONFIG` 
belongs to `FieldSyntaxVersion.values()` (maybe case-insensitive match if 
desired)?



##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value 
(e.g. Struct or
+ * Map).
+ * 
+ * It follows a dotted notation to represent nested values.
+ * If field names contain dots, can be escaped by wrapping field names 
with backticks.
+ * If field names contain dots at wrapping positions (beginning or end of 
path, before or after dots), then backticks need to be
+ * escaped by backslash.
+ * 
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+private static final String BACKTICK = "`";
+private static final String DOT = ".";
+public static final char BACKTICK_CHAR = '`';
+public static final char DOT_CHAR = '.';
+public static final char BACKSLASH_CHAR = '\\';
+
+private static final Map PATHS_CACHE = new HashMap<>();
+
+private final String[] path;
+
+public static FieldPath ofV1(String field) {
+return of(field, FieldSyntaxVersion.V1);
+}
+
+public static FieldPath ofV2(String field) {
+return of(field, FieldSyntaxVersion.V2);
+}
+
+/**
+ * If version is V2, then paths are cached for further access.
+ *
+ * @param field field path expression
+ * @param version  field syntax version
+ */
+public static FieldPath of(St

[GitHub] [kafka] showuon commented on pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2022-09-20 Thread GitBox


showuon commented on PR #12590:
URL: https://github.com/apache/kafka/pull/12590#issuecomment-1252100858

   > Perhaps a naive question but does the fetch request to close the session 
fetches any records? Or does it just close the session and return?
   
   @dajac , good question. When consumer closing, it'll leave group first, and 
then close fetcher. I thought leaving group will clear the owned partition, but 
looks like it won't. Maybe we need to update in broker side, to not return 
records when client is trying to close the session and not create a new one. 
@divijvaidya , WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975126734


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##
@@ -360,12 +339,11 @@ public void shouldRecycleStoreAndReregisterChangelog() {
 
 reset(context, store);
 context.uninitialize();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975129867


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -177,10 +181,7 @@ public void 
shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce
 
 @Test
 public void shouldTransitToRunningAfterInitialization() {
-
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-stateManager.registerStateStores(EasyMock.anyObject(), 
EasyMock.anyObject());
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-EasyMock.replay(stateManager);
+stateManager.registerStateStores(any(), any());

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975140126


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -212,31 +212,29 @@ public void shouldThrowIfCommittingOnIllegalState() {
 @Test
 public void shouldAlwaysCheckpointStateIfEnforced() {
 stateManager.flush();
-EasyMock.expectLastCall().once();
+verify(stateManager, times(1)).flush();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975142135


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -212,31 +212,29 @@ public void shouldThrowIfCommittingOnIllegalState() {
 @Test
 public void shouldAlwaysCheckpointStateIfEnforced() {
 stateManager.flush();
-EasyMock.expectLastCall().once();
+verify(stateManager, times(1)).flush();
 stateManager.checkpoint();
-EasyMock.expectLastCall().once();
-
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-EasyMock.replay(stateManager);
+verify(stateManager, times(1)).checkpoint();
+
when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975145693


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -212,31 +212,29 @@ public void shouldThrowIfCommittingOnIllegalState() {
 @Test
 public void shouldAlwaysCheckpointStateIfEnforced() {
 stateManager.flush();
-EasyMock.expectLastCall().once();
+verify(stateManager, times(1)).flush();
 stateManager.checkpoint();
-EasyMock.expectLastCall().once();
-
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-EasyMock.replay(stateManager);
+verify(stateManager, times(1)).checkpoint();
+
when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());
 
 task = createStandbyTask();
 
 task.initializeIfNeeded();
 task.maybeCheckpoint(true);
 
-EasyMock.verify(stateManager);
+verify(stateManager, atLeastOnce()).flush();

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975149899


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##
@@ -308,29 +300,21 @@ public void shouldRestoreTimestampedStoreWithConverter() {
 @Test
 public void shouldUnregisterChangelogsDuringClose() {
 final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-reset(storeMetadata);
-final StateStore store = EasyMock.createMock(StateStore.class);
-
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
-expect(storeMetadata.store()).andStubReturn(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-
-context.uninitialize();
-store.init((StateStoreContext) context, store);
-replay(storeMetadata, context, store);
+final StateStore store = mock(StateStore.class);
+when(store.name()).thenReturn(persistentStoreName);
 
 stateMgr.registerStateStores(singletonList(store), context);
-verify(context, store);
+verify(context).uninitialize();
+verify(store).init((StateStoreContext) context, store);
 
 stateMgr.registerStore(store, noopStateRestoreCallback, null);
 
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
 
 reset(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-store.close();
-replay(store);
+when(store.name()).thenReturn(persistentStoreName);

Review Comment:
   No its not, I just removed this along with the `verify(store).close();` 
since its pointless



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on PR #12524:
URL: https://github.com/apache/kafka/pull/12524#issuecomment-1252139067

   @cadonna  Okay so I have just pushed and rebased the PR with these changes
   
   * As you have rightly pointed out there were a lot of things being 
stubbed/tested which made no sense due to peculiarities of how `EasyMock` works 
(which I am still trying to get my head around). I believe I have removed most 
of the unnecessary `reset`s/stubs. There are still some `verify`'s which can be 
argued are pointless (mainly in the setup of the mock), if you still want to 
remove these then let me know
   * `StreamTaskTest` previously with `EasyMock` used stub exceptions to verify 
a method was not meant to be called, i.e.
 ```java
 EasyMock.expectLastCall().andThrow(new AssertionError("Should not have 
tried to checkpoint"));
 ```
 This has now been replaced with using `never`, i.e. the above example 
translates to `verify(stateManager, never()).checkpoint();` which 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-09-20 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607083#comment-17607083
 ] 

Nicholas Telford commented on KAFKA-10635:
--

[~guozhang] is there any more information I can provide that might help zero in 
on this issue? If you have a PR that adds some additional logging, I could 
potentially patch it in to my Kafka brokers if you're having trouble 
replicating this in development environments?

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
> Attachments: logs.csv
>
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)   
>  at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> {code}
> We see a corresponding error on the broker side:
> {code:java}
> [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
> processing append operation on partition 
> topic-name-Aggregation-repartition-52  
> (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
>  Out of order sequence number for producerId 2819098 at offset 1156041 in 
> partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), 
> -1 (current end sequence number)
> {code}
> We are able to reproduce this many times and it happens regardless of whether 
> the broker shutdown (at restart) is clean or unclean. However, when we 
> rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling 
> restarts, we don't see this error on the streams application at all. This is 
> blocking us from upgrading our broker

[GitHub] [kafka] cadonna merged pull request #12648: MINOR: Fixes in release.py

2022-09-20 Thread GitBox


cadonna merged PR #12648:
URL: https://github.com/apache/kafka/pull/12648


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12465: KAFKA-12950: Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest

2022-09-20 Thread GitBox


divijvaidya commented on code in PR #12465:
URL: https://github.com/apache/kafka/pull/12465#discussion_r975245160


##
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java:
##
@@ -192,154 +209,141 @@ public void before() throws Exception {
 prepareStreams();
 }
 
+@After
+public void tearDown() {
+kafkaStreamsMockedStatic.close();
+clientMetricsMockedStatic.close();
+streamThreadMockedStatic.close();
+globalStreamThreadMockedConstruction.close();
+if (stateDirectoryMockedConstruction != null)
+stateDirectoryMockedConstruction.close();
+streamsConfigUtils.close();
+}
+
 private void prepareStreams() throws Exception {
 // setup metrics
-PowerMock.expectNew(Metrics.class,
-anyObject(MetricConfig.class),
-capture(metricsReportersCapture),
-anyObject(Time.class),
-anyObject(MetricsContext.class)
-).andAnswer(() -> {
+kafkaStreamsMockedStatic = mockStatic(KafkaStreams.class, 
withSettings()
+.defaultAnswer(InvocationOnMock::callRealMethod));
+kafkaStreamsMockedStatic.when(() -> KafkaStreams.createMetrics(
+any(MetricConfig.class),
+metricsReportersCapture.capture(),
+any(Time.class),
+any(MetricsContext.class)
+)).thenAnswer(invocation -> {

Review Comment:
   Thank you for the suggestion. It took a while but figured out a way to do 
what we wanted to in the test without having to change any public API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12465: KAFKA-12950: Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest

2022-09-20 Thread GitBox


divijvaidya commented on code in PR #12465:
URL: https://github.com/apache/kafka/pull/12465#discussion_r975245954


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -990,7 +990,12 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
 return streamThread;
 }
 
-static Metrics getMetrics(final StreamsConfig config, final Time time, 
final String clientId) {
+// Ensure Mockito stub construct with capture argument for 
KafkaStreamsTest.
+public static Metrics createMetrics(final MetricConfig metricConfig, final 
List reporters, final Time time, final MetricsContext 
metricsContext) {
+return new Metrics(metricConfig, reporters, time, metricsContext);
+}

Review Comment:
   I have changed this in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12465: KAFKA-12950: Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest

2022-09-20 Thread GitBox


divijvaidya commented on code in PR #12465:
URL: https://github.com/apache/kafka/pull/12465#discussion_r975246480


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -252,10 +252,15 @@ State setState(final State newState) {
 
 public boolean isRunning() {
 synchronized (stateLock) {
-return state.isAlive();
+return isStateAlive();
 }
 }
 
+// Ensure Mockito can stub method for KafkaStreamTest.
+public boolean isStateAlive() {
+return state.isAlive();
+}
+

Review Comment:
   This has been removed in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


cadonna commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975189051


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##
@@ -310,48 +301,30 @@ public void shouldRestoreTimestampedStoreWithConverter() {
 @Test
 public void shouldUnregisterChangelogsDuringClose() {
 final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-reset(storeMetadata);
-final StateStore store = EasyMock.createMock(StateStore.class);
-
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
-expect(storeMetadata.store()).andStubReturn(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-
-context.uninitialize();
-store.init((StateStoreContext) context, store);
-replay(storeMetadata, context, store);
+final StateStore store = mock(StateStore.class);
+when(store.name()).thenReturn(persistentStoreName);
 
 stateMgr.registerStateStores(singletonList(store), context);
-verify(context, store);
+verify(context).uninitialize();
+verify(store).init((StateStoreContext) context, store);
 
 stateMgr.registerStore(store, noopStateRestoreCallback, null);
 
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
 
-reset(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-store.close();

Review Comment:
   You should account for this by putting `verify(store).close()` after line 
314. Or am I missing something?  



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -211,32 +205,22 @@ public void shouldThrowIfCommittingOnIllegalState() {
 
 @Test
 public void shouldAlwaysCheckpointStateIfEnforced() {
-stateManager.flush();

Review Comment:
   You missed to verify this call.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -211,32 +205,22 @@ public void shouldThrowIfCommittingOnIllegalState() {
 
 @Test
 public void shouldAlwaysCheckpointStateIfEnforced() {
-stateManager.flush();
-EasyMock.expectLastCall().once();
-stateManager.checkpoint();
-EasyMock.expectLastCall().once();
-
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-EasyMock.replay(stateManager);
+
when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());
 
 task = createStandbyTask();
 
 task.initializeIfNeeded();
 task.maybeCheckpoint(true);
 
-EasyMock.verify(stateManager);
+verify(stateManager).checkpoint();
 }
 
 @Test
 public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
-stateManager.flush();
-EasyMock.expectLastCall().once();
-stateManager.checkpoint();

Review Comment:
   You missed to verify this call.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
 final double expectedCloseTaskMetric = 1.0;
 verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-EasyMock.verify(stateManager);
 }
 
 @Test
 public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-stateManager.close();
-EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-EasyMock.replay(stateManager);
+doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
 task = createStandbyTask();
 task.initializeIfNeeded();
 
 task.suspend();
 task.closeDirty();
 
-EasyMock.verify(stateManager);
+verify(stateManager, atLeastOnce()).close();
 }
 
 @Test
 public void shouldSuspendAndCommitBeforeCloseClean() {
 stateManager.close();
-EasyMock.expectLastCall();
+verify(stateManager, atLeastOnce()).close();
 stateManager.checkpoint();

Review Comment:
   Remove this call on the mock. In Mockito it does not make sense.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -247,23 +231,19 @@ public void 
shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
 assertEquals(Collections.singletonMap(partition, 11000L), 
task.offsetSnapshotSinceLastFlush);
 task.maybeCheckpoint(false);  // this should not checkpoint
 assertEquals(Collections.singletonMap(partition, 11000L), 
task.offsetSnapshotSinceLastFlush);
-
-EasyMock.verify(stateManager);
 }
 
  

[GitHub] [kafka] mimaison commented on pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2022-09-20 Thread GitBox


mimaison commented on PR #11748:
URL: https://github.com/apache/kafka/pull/11748#issuecomment-1252285749

   @ryan-burningham This fix will be included in the upcoming 3.3.0 release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12465: KAFKA-12950: Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest

2022-09-20 Thread GitBox


divijvaidya commented on code in PR #12465:
URL: https://github.com/apache/kafka/pull/12465#discussion_r975301060


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1096,7 +1096,7 @@ private Optional removeStreamThread(final long 
timeoutMs) throws Timeout
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 final boolean callingThreadIsNotCurrentStreamThread = 
!streamThread.getName().equals(Thread.currentThread().getName());
-if (streamThread.isAlive() && 
(callingThreadIsNotCurrentStreamThread || getNumLiveStreamThreads() == 1)) {

Review Comment:
   Note that `streamThread.isAlive()` is a `native` method and Mockito 
currently does not support mocking of `native` methods. 
   
   I have made this change in logic to remove the check because even if the 
thread is not running, we should remove it from data structures such as 
`threads` and perform the clean up of the Java object. Hence, the check is not 
really necessary.
   
   @cadonna let me know if you have better ideas here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12465: KAFKA-12950: Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest

2022-09-20 Thread GitBox


divijvaidya commented on code in PR #12465:
URL: https://github.com/apache/kafka/pull/12465#discussion_r975301980


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -1146,6 +1146,10 @@ private long advanceNowAndComputeLatency() {
 public void shutdown() {
 log.info("Informed to shut down");
 final State oldState = setState(State.PENDING_SHUTDOWN);
+
+// If the thread is already shutting down, this is a No-op.
+if (oldState == State.PENDING_SHUTDOWN || oldState == State.DEAD) 
return;

Review Comment:
   I would like to piggy back this change with this commit. Happy to open a new 
PR too if you think that is required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akatona84 commented on pull request #11373: MINOR: Add shebang to gradlewAll

2022-09-20 Thread GitBox


akatona84 commented on PR #11373:
URL: https://github.com/apache/kafka/pull/11373#issuecomment-1252289779

   @omkreddy , @ijuma 
   could you take a look? pretty small change, pretty pls :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #12645: MINOR: Log controller id/epoch when LeaderAndIsr, StopReplica and UpdateMetadata requests are fenced

2022-09-20 Thread GitBox


dajac merged PR #12645:
URL: https://github.com/apache/kafka/pull/12645


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975392538


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##
@@ -310,48 +301,30 @@ public void shouldRestoreTimestampedStoreWithConverter() {
 @Test
 public void shouldUnregisterChangelogsDuringClose() {
 final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-reset(storeMetadata);
-final StateStore store = EasyMock.createMock(StateStore.class);
-
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
-expect(storeMetadata.store()).andStubReturn(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-
-context.uninitialize();
-store.init((StateStoreContext) context, store);
-replay(storeMetadata, context, store);
+final StateStore store = mock(StateStore.class);
+when(store.name()).thenReturn(persistentStoreName);
 
 stateMgr.registerStateStores(singletonList(store), context);
-verify(context, store);
+verify(context).uninitialize();
+verify(store).init((StateStoreContext) context, store);
 
 stateMgr.registerStore(store, noopStateRestoreCallback, null);
 
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
 
-reset(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-store.close();

Review Comment:
   Adding `verify(store).close()` causes the test to fail, `store.close()` is 
not executed at this point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975397584


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##
@@ -360,14 +333,8 @@ public void shouldRecycleStoreAndReregisterChangelog() {
 
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
 assertThat(stateMgr.getStore(persistentStoreName), equalTo(store));
 
-reset(context, store);
-context.uninitialize();

Review Comment:
   So counts are absolute which means that modifying 
`verify(context).uninitialize();` to `verify(context, times(2)).uninitialize()` 
is not necessary.  I did add `verify(context).uninitialize();` on line 336.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975412340


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -211,32 +205,22 @@ public void shouldThrowIfCommittingOnIllegalState() {
 
 @Test
 public void shouldAlwaysCheckpointStateIfEnforced() {
-stateManager.flush();

Review Comment:
   Done.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -177,11 +179,6 @@ public void 
shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce
 
 @Test
 public void shouldTransitToRunningAfterInitialization() {
-
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-stateManager.registerStateStores(EasyMock.anyObject(), 
EasyMock.anyObject());

Review Comment:
   Yes this part of `EasyMock` I understand, the problem is that a lot of the 
tests had stubbings that were unnecessary which was being picked up being 
`MockitoJUnitRunner.StrictStubs.class`. Ontop of this 
`Easymock.verify(stateManager)` has no equivalent in mockito since its 
considered bad practice to just verify any interaction on a class (Mockito 
forces you to specify which method is being called, not just 
`Easymock.verify(someClass)`. Since the `Easymock.verify(someClass)` was 
typically placed at the bottom of the test methods so it was hard to tell what 
exactly was expected to mock and in some cases it was pointless.
   
   Evidently I did some mistakes so I have put back 
`stateManager.registerStateStores(any(), any());` however note that 
`stateManager.registerStateStores()` is being called twice so I used 
`atLeastOnce`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975392538


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##
@@ -310,48 +301,30 @@ public void shouldRestoreTimestampedStoreWithConverter() {
 @Test
 public void shouldUnregisterChangelogsDuringClose() {
 final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-reset(storeMetadata);
-final StateStore store = EasyMock.createMock(StateStore.class);
-
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
-expect(storeMetadata.store()).andStubReturn(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-
-context.uninitialize();
-store.init((StateStoreContext) context, store);
-replay(storeMetadata, context, store);
+final StateStore store = mock(StateStore.class);
+when(store.name()).thenReturn(persistentStoreName);
 
 stateMgr.registerStateStores(singletonList(store), context);
-verify(context, store);
+verify(context).uninitialize();
+verify(store).init((StateStoreContext) context, store);
 
 stateMgr.registerStore(store, noopStateRestoreCallback, null);
 
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
 
-reset(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-store.close();

Review Comment:
   Adding `verify(store).close()` at this point causes the test to fail, 
`store.close()` is not executed at this point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


cadonna commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975414658


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##
@@ -310,48 +301,30 @@ public void shouldRestoreTimestampedStoreWithConverter() {
 @Test
 public void shouldUnregisterChangelogsDuringClose() {
 final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-reset(storeMetadata);
-final StateStore store = EasyMock.createMock(StateStore.class);
-
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
-expect(storeMetadata.store()).andStubReturn(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-
-context.uninitialize();
-store.init((StateStoreContext) context, store);
-replay(storeMetadata, context, store);
+final StateStore store = mock(StateStore.class);
+when(store.name()).thenReturn(persistentStoreName);
 
 stateMgr.registerStateStores(singletonList(store), context);
-verify(context, store);
+verify(context).uninitialize();
+verify(store).init((StateStoreContext) context, store);
 
 stateMgr.registerStore(store, noopStateRestoreCallback, null);
 
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
 
-reset(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-store.close();

Review Comment:
   Then I do not understand why the test passed before. I suppose there is a 
mistake in the test setup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975414772


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
 final double expectedCloseTaskMetric = 1.0;
 verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-EasyMock.verify(stateManager);
 }
 
 @Test
 public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-stateManager.close();
-EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-EasyMock.replay(stateManager);
+doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
 task = createStandbyTask();
 task.initializeIfNeeded();
 
 task.suspend();
 task.closeDirty();
 
-EasyMock.verify(stateManager);
+verify(stateManager, atLeastOnce()).close();
 }
 
 @Test
 public void shouldSuspendAndCommitBeforeCloseClean() {
 stateManager.close();
-EasyMock.expectLastCall();
+verify(stateManager, atLeastOnce()).close();
 stateManager.checkpoint();

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975415669


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
 final double expectedCloseTaskMetric = 1.0;
 verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-EasyMock.verify(stateManager);
 }
 
 @Test
 public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-stateManager.close();
-EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-EasyMock.replay(stateManager);
+doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
 task = createStandbyTask();
 task.initializeIfNeeded();
 
 task.suspend();
 task.closeDirty();
 
-EasyMock.verify(stateManager);
+verify(stateManager, atLeastOnce()).close();
 }
 
 @Test
 public void shouldSuspendAndCommitBeforeCloseClean() {
 stateManager.close();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2022-09-20 Thread GitBox


divijvaidya commented on PR #12590:
URL: https://github.com/apache/kafka/pull/12590#issuecomment-1252416795

   > does the fetch request to close the session fetches any records
   
   No, because the fetch request's field for topic partitions is set to empty 
at `sessionHandler.newBuilder().build()` (line 1963 at Fetcher.java). Also, 
note that the empty fetch data in the close-fetch-request is asserted in the 
test at `testFetcherCloseClosesFetchSessionsInBroker` at 
`assertTrue(builder.fetchData().isEmpty());`
   
   On the server side, the server handles a close fetch message by creating a 
`SessionlessFetchContext` which will return an empty response  if FetchData is 
empty (see FetchSession.scala line 364)
   
   @showuon @dajac Please let me know if I am missing anything here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14244) Prevent unit/integration tests from terminating JVM

2022-09-20 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14244:
-

 Summary: Prevent unit/integration tests from terminating JVM
 Key: KAFKA-14244
 URL: https://issues.apache.org/jira/browse/KAFKA-14244
 Project: Kafka
  Issue Type: Improvement
  Components: unit tests
Reporter: Chris Egerton
Assignee: Chris Egerton


We've seen several cases of unit/integration tests invoking {{System::exit}} in 
the past, which terminates the CI build immediately and blocks us from 
collecting results for any not-yet-run tests.

In most cases, it's simple enough to use the appropriate wrapper class for 
calls to {{System::exit}} (which is available in both 
[Java|https://github.com/apache/kafka/blob/ff4c4d1365b189131a4f31b2dc49853f874c3ea5/clients/src/main/java/org/apache/kafka/common/utils/Exit.java]
 and 
[Scala|https://github.com/apache/kafka/blob/ff4c4d1365b189131a4f31b2dc49853f874c3ea5/core/src/main/scala/kafka/utils/Exit.scala]),
 but if the issue cannot be reproduced locally and instead only occurs during 
CI builds, it becomes difficult to identify exactly which test is causing the 
JVM to terminate.

In addition, there are some cases where, even though care is taken to use the 
correct wrapper class during testing, threads that are leaked during the test 
end up attempting to terminate the JVM after the wrapper class has been reset, 
which causes those attempts to actually succeed. See KAFKA-14242 for an example 
of one such test.

We can explore one or more of these potential improvements:
 * Always prevent calls to {{System::exit}} from succeeding during tests, both 
locally and in CI
 * Attempt to report which tests are responsible for invoking {{System::exit}} 
directly, so that they can be patched
 * Fail the build when {{System::exit}} is invoked by a test, in order to 
surface the issue (which can be either be indicative of poor testing logic, or 
a genuine bug) during CI builds



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975440743


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
 final double expectedCloseTaskMetric = 1.0;
 verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-EasyMock.verify(stateManager);
 }
 
 @Test
 public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-stateManager.close();
-EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-EasyMock.replay(stateManager);
+doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
 task = createStandbyTask();
 task.initializeIfNeeded();
 
 task.suspend();
 task.closeDirty();
 
-EasyMock.verify(stateManager);
+verify(stateManager, atLeastOnce()).close();
 }
 
 @Test
 public void shouldSuspendAndCommitBeforeCloseClean() {
 stateManager.close();
-EasyMock.expectLastCall();
+verify(stateManager, atLeastOnce()).close();
 stateManager.checkpoint();
-EasyMock.expectLastCall().once();
-EasyMock.expect(stateManager.changelogOffsets())
-.andReturn(Collections.singletonMap(partition, 60L));
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();

Review Comment:
   This was another case where I didn't know what 
`EasyMock.verify(stateManager)` at the end of the method was actually verifying.
   
   I placed 
   
   ```java
   when(stateManager.changelogOffsets())
   .thenReturn(Collections.singletonMap(partition, 60L));
   when(stateManager.changelogPartitions())
   .thenReturn(Collections.singleton(partition));
   ```
   
   back and verified `verify(stateManager).changelogOffsets();`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975440743


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
 final double expectedCloseTaskMetric = 1.0;
 verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-EasyMock.verify(stateManager);
 }
 
 @Test
 public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-stateManager.close();
-EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-EasyMock.replay(stateManager);
+doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
 task = createStandbyTask();
 task.initializeIfNeeded();
 
 task.suspend();
 task.closeDirty();
 
-EasyMock.verify(stateManager);
+verify(stateManager, atLeastOnce()).close();
 }
 
 @Test
 public void shouldSuspendAndCommitBeforeCloseClean() {
 stateManager.close();
-EasyMock.expectLastCall();
+verify(stateManager, atLeastOnce()).close();
 stateManager.checkpoint();
-EasyMock.expectLastCall().once();
-EasyMock.expect(stateManager.changelogOffsets())
-.andReturn(Collections.singletonMap(partition, 60L));
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();

Review Comment:
   This was another case where I didn't know what 
`EasyMock.verify(stateManager)` at the end of the method was actually verifying.
   
   I placed 
   
   ```java
   when(stateManager.changelogOffsets())
   .thenReturn(Collections.singletonMap(partition, 60L));
   ```
   
   back and verified `verify(stateManager).changelogOffsets();`. Note that 
mockito reported
   ```java
   when(stateManager.changelogPartitions())
   .thenReturn(Collections.singleton(partition));
   ```
   as unnecessary due to `verify(stateManager).changelogPartitions()` never 
actually being called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975440743


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
 final double expectedCloseTaskMetric = 1.0;
 verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-EasyMock.verify(stateManager);
 }
 
 @Test
 public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-stateManager.close();
-EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-EasyMock.replay(stateManager);
+doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
 task = createStandbyTask();
 task.initializeIfNeeded();
 
 task.suspend();
 task.closeDirty();
 
-EasyMock.verify(stateManager);
+verify(stateManager, atLeastOnce()).close();
 }
 
 @Test
 public void shouldSuspendAndCommitBeforeCloseClean() {
 stateManager.close();
-EasyMock.expectLastCall();
+verify(stateManager, atLeastOnce()).close();
 stateManager.checkpoint();
-EasyMock.expectLastCall().once();
-EasyMock.expect(stateManager.changelogOffsets())
-.andReturn(Collections.singletonMap(partition, 60L));
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();

Review Comment:
   This was another case where I didn't know what 
`EasyMock.verify(stateManager)` at the end of the method was actually verifying.
   
   I placed 
   
   ```java
   when(stateManager.changelogOffsets())
   .thenReturn(Collections.singletonMap(partition, 60L));
   ```
   
   back and verified `verify(stateManager).changelogOffsets();`. Note that 
mockito reported
   ```java
   when(stateManager.changelogPartitions())
   .thenReturn(Collections.singleton(partition));
   ```
   as unnecessary due to `verify(stateManager).changelogPartitions()` never 
actually being executed in the test run.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975464527


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
 final double expectedCloseTaskMetric = 1.0;
 verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-EasyMock.verify(stateManager);
 }
 
 @Test
 public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-stateManager.close();
-EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-EasyMock.replay(stateManager);
+doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
 task = createStandbyTask();
 task.initializeIfNeeded();
 
 task.suspend();
 task.closeDirty();
 
-EasyMock.verify(stateManager);
+verify(stateManager, atLeastOnce()).close();
 }
 
 @Test
 public void shouldSuspendAndCommitBeforeCloseClean() {

Review Comment:
   I am unsure what needs to be verified in order? The current version of the 
test is 
   
   ```java
   @Test
   public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
   doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
   
   task = createStandbyTask();
   task.initializeIfNeeded();
   
   task.suspend();
   task.closeDirty();
   }
   ```
   
   And from what I can tell its testing that the `RuntimeException` ie never 
thrown when `task.closeDirty()` is called (`task.closeDirty() indirectly calls 
`stateManager.close()`). The only mock is the `doThrow` so there is nothing to 
really order?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975464527


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
 final double expectedCloseTaskMetric = 1.0;
 verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-EasyMock.verify(stateManager);
 }
 
 @Test
 public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-stateManager.close();
-EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-EasyMock.replay(stateManager);
+doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
 task = createStandbyTask();
 task.initializeIfNeeded();
 
 task.suspend();
 task.closeDirty();
 
-EasyMock.verify(stateManager);
+verify(stateManager, atLeastOnce()).close();
 }
 
 @Test
 public void shouldSuspendAndCommitBeforeCloseClean() {

Review Comment:
   I am unsure what needs to be verified in order? The current version of the 
test is 
   
   ```java
   @Test
   public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
   doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
   
   task = createStandbyTask();
   task.initializeIfNeeded();
   
   task.suspend();
   task.closeDirty();
   }
   ```
   
   And from what I can tell its testing that the `RuntimeException` ie never 
thrown when `task.closeDirty()` is called (`task.closeDirty()` indirectly calls 
`stateManager.close()`). The only mock is the `doThrow` so there is nothing to 
really order?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975469281


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
 final double expectedCloseTaskMetric = 1.0;
 verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-EasyMock.verify(stateManager);
 }
 
 @Test
 public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-stateManager.close();
-EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-EasyMock.replay(stateManager);
+doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
 task = createStandbyTask();
 task.initializeIfNeeded();
 
 task.suspend();
 task.closeDirty();
 
-EasyMock.verify(stateManager);
+verify(stateManager, atLeastOnce()).close();
 }
 
 @Test
 public void shouldSuspendAndCommitBeforeCloseClean() {
 stateManager.close();
-EasyMock.expectLastCall();
+verify(stateManager, atLeastOnce()).close();

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975472755


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -211,32 +205,22 @@ public void shouldThrowIfCommittingOnIllegalState() {
 
 @Test
 public void shouldAlwaysCheckpointStateIfEnforced() {
-stateManager.flush();
-EasyMock.expectLastCall().once();
-stateManager.checkpoint();
-EasyMock.expectLastCall().once();
-
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-EasyMock.replay(stateManager);
+
when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());
 
 task = createStandbyTask();
 
 task.initializeIfNeeded();
 task.maybeCheckpoint(true);
 
-EasyMock.verify(stateManager);
+verify(stateManager).checkpoint();
 }
 
 @Test
 public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
-stateManager.flush();

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975478357


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##
@@ -310,48 +301,30 @@ public void shouldRestoreTimestampedStoreWithConverter() {
 @Test
 public void shouldUnregisterChangelogsDuringClose() {
 final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-reset(storeMetadata);
-final StateStore store = EasyMock.createMock(StateStore.class);
-
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
-expect(storeMetadata.store()).andStubReturn(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-
-context.uninitialize();
-store.init((StateStoreContext) context, store);
-replay(storeMetadata, context, store);
+final StateStore store = mock(StateStore.class);
+when(store.name()).thenReturn(persistentStoreName);
 
 stateMgr.registerStateStores(singletonList(store), context);
-verify(context, store);
+verify(context).uninitialize();
+verify(store).init((StateStoreContext) context, store);
 
 stateMgr.registerStore(store, noopStateRestoreCallback, null);
 
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
 
-reset(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-store.close();

Review Comment:
   So if I use the `Mockito.reset` to simulate what the original `EasyMock` was 
doing, i.e.
   
   ```java
   reset(store);
   when(store.name()).thenReturn(persistentStoreName);
   store.close();
   ```
   
   The the test passes. Note that you told me to remove this beforehand (see 
https://github.com/apache/kafka/pull/12524#discussion_r968477415)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on PR #12524:
URL: https://github.com/apache/kafka/pull/12524#issuecomment-1252484624

   @cadonna I have addressed the comments which I could. I just noticed that I 
have gone back and forth between removing and adding the same 
stubs/verifications in various without realizing due to me force pushing so I 
pushed each of the changes as an individual commit to better track changes. If 
you want me to squash the PR then let me know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-20 Thread GitBox


mdedetrich commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975464527


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
 final double expectedCloseTaskMetric = 1.0;
 verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-EasyMock.verify(stateManager);
 }
 
 @Test
 public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-stateManager.close();
-EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-EasyMock.replay(stateManager);
+doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
 task = createStandbyTask();
 task.initializeIfNeeded();
 
 task.suspend();
 task.closeDirty();
 
-EasyMock.verify(stateManager);
+verify(stateManager, atLeastOnce()).close();
 }
 
 @Test
 public void shouldSuspendAndCommitBeforeCloseClean() {

Review Comment:
   I am unsure what needs to be verified in order? The current version of the 
test is 
   
   ```java
   @Test
   public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
   doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
   
   task = createStandbyTask();
   task.initializeIfNeeded();
   
   task.suspend();
   task.closeDirty();
   }
   ```
   
   And from what I can tell its testing that the `RuntimeException` ie never 
thrown when `task.closeDirty()` is called (`task.closeDirty()` indirectly calls 
`stateManager.close()`). The only mock is the `doThrow` so there is nothing to 
really order?
   
   I can add `verify(stateManager).close();` at the end of the test but even 
then there is only a single `.verify` statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12628: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.

2022-09-20 Thread GitBox


mumrah commented on code in PR #12628:
URL: https://github.com/apache/kafka/pull/12628#discussion_r975497294


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -530,54 +545,14 @@ static AuthorizationResult findResult(Action action,
 }
 
 Iterable acls(AclBindingFilter filter) {

Review Comment:
   Let's add a javadoc to this method indicating that we are returning a copy 
of the AclBinding-s



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -193,15 +194,29 @@ StandardAuthorizerData 
copyWithNewAcls(Collection> aclE
 loadingComplete,
 superUsers,
 defaultRule.result,
-new ConcurrentSkipListSet<>(),
-new ConcurrentHashMap<>());
+new TreeSet<>(),
+new HashMap<>());
 for (Entry entry : aclEntries) {
 newData.addAcl(entry.getKey(), entry.getValue());
 }
 log.info("Applied {} acl(s) from image.", aclEntries.size());
 return newData;
 }
 
+StandardAuthorizerData copyWithNewAcls(TreeSet 
aclsByResource, HashMap initialLoadFuture = new 
CompletableFuture<>();
 
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
 /**
- * The current data. Can be read without a lock. Must be written while 
holding the object lock.
+ * The current data. We use a read-write lock to synchronize reads and 
writes to the data.

Review Comment:
   Let's mention that we expect a single writer and multiple readers for this 
class and that the locks are here to ensure consistency (i.e., the issue you 
found)



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -129,23 +167,37 @@ public synchronized void loadSnapshot(Map acls) {
 public List authorize(
 AuthorizableRequestContext requestContext,
 List actions) {
-StandardAuthorizerData curData = data;
 List results = new ArrayList<>(actions.size());
-for (Action action: actions) {
-AuthorizationResult result = curData.authorize(requestContext, 
action);
-results.add(result);
+lock.readLock().lock();
+try {
+for (Action action : actions) {
+AuthorizationResult result = data.authorize(requestContext, 
action);
+results.add(result);
+}
+} finally {
+lock.readLock().unlock();
 }
 return results;
 }
 
 @Override
 public Iterable acls(AclBindingFilter filter) {
-return data.acls(filter);
+lock.readLock().lock();
+try {
+return data.acls(filter);

Review Comment:
   Can you add a comment here that the returned `Iterable` is consistent?



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -129,23 +167,37 @@ public synchronized void loadSnapshot(Map acls) {
 public List authorize(
 AuthorizableRequestContext requestContext,
 List actions) {
-StandardAuthorizerData curData = data;
 List results = new ArrayList<>(actions.size());
-for (Action action: actions) {
-AuthorizationResult result = curData.authorize(requestContext, 
action);
-results.add(result);
+lock.readLock().lock();
+try {
+for (Action action : actions) {
+AuthorizationResult result = data.authorize(requestContext, 
action);

Review Comment:
   Even in the lock, we should still load `data` into a local variable to avoid 
loading the volatile multiple times



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

2022-09-20 Thread GitBox


jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r975521064


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java:
##
@@ -39,33 +41,35 @@
 private static final String FIELD_CONFIG = "field";
 
 public static final ConfigDef CONFIG_DEF = new ConfigDef()
-.define(FIELD_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to 
extract.");
+.define(FIELD_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to 
extract.")
+.define(FIELD_SYNTAX_VERSION_CONFIG, ConfigDef.Type.STRING, 
FIELD_SYNTAX_VERSION_DEFAULT_VALUE, ConfigDef.Importance.HIGH, 
FIELD_SYNTAX_VERSION_DOC);

Review Comment:
   Great idea! adding validator and constants to `FieldSyntaxVersion`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

2022-09-20 Thread GitBox


jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r975521554


##
connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java:
##
@@ -24,20 +24,31 @@
 
 /**
  * Single message transformation for Kafka Connect record types.
- *
+ * 
  * Connectors can be configured with transformations to make lightweight 
message-at-a-time modifications.
  */
 public interface Transformation> extends 
Configurable, Closeable {
 
+String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
+String FIELD_SYNTAX_VERSION_DOC = "Defines the version of the syntax to 
access fields. "
++ "If set to `V1`, then the field paths are limited to access the 
elements at the root level of the struct or map."
++ "If set to `V2`, the syntax will support accessing nested elements. 
o access nested elements, "
++ "dotted notation is used. If dots are already included in the field 
name, then backtick pairs "
++ "can be used to wrap field names containing dots. "
++ "e.g. to access elements from a struct/map named \"foo.bar\", "
++ "the following format can be used to access its elements: 
\"`foo.bar`.baz\".";
+
+String FIELD_SYNTAX_VERSION_DEFAULT_VALUE = "V1";

Review Comment:
   Agree, moving it to `FieldSyntaxVersion`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] der-eismann opened a new pull request, #12665: Fix typo in info message

2022-09-20 Thread GitBox


der-eismann opened a new pull request, #12665:
URL: https://github.com/apache/kafka/pull/12665

   Minor change to a log line that had a typo in it
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante opened a new pull request, #12666: KAFKA-14244: Add guard against accidental calls to halt JVM during testing

2022-09-20 Thread GitBox


C0urante opened a new pull request, #12666:
URL: https://github.com/apache/kafka/pull/12666

   [Jira](https://issues.apache.org/jira/browse/KAFKA-14244)
   
   Uses an [automatically-registered JUnit 5 
extension](https://junit.org/junit5/docs/snapshot/user-guide/#extensions-registration-automatic)
 that installs a 
[SecurityManager](https://docs.oracle.com/javase/8/docs/api/java/lang/SecurityManager.html)
 which intercepts attempts to terminate the JVM that appear to have come from 
test cases, and then reports those attempts by causing tests to fail. If the 
test that attempted to terminate the JVM is running at the time of the attempt, 
that test will fail. If not, a random test will fail at a later point (in order 
to fail the CI build and surface the issue), with a message reporting that a 
prior test appears to have leaked a thread which has attempted to terminate the 
JVM after that test completed.
   
   The 
[InheritableThreadLocal](https://docs.oracle.com/javase/8/docs/api/java/lang/InheritableThreadLocal.html)
 class is used to track which tests attempt to terminate the JVM, including 
cases where the attempt occurs in a separate thread, and even when the attempt 
has occurred after the test has already completed.
   
   Although the `SecurityManager` API was deprecated and slated for removal in 
[JEP-411](https://openjdk.org/jeps/411), there is no existing alternative for 
intercepting attempts to terminate the JVM (see 
https://bugs.openjdk.org/browse/JDK-8199704). Once an alternative becomes 
available, we can use it instead of the `SecurityManager` API.
   
   This is heavily inspired by the 
[system-rules](https://github.com/stefanbirkner/system-rules) library, with 
some modifications to support parallel testing.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

2022-09-20 Thread GitBox


jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r975536673


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value 
(e.g. Struct or
+ * Map).
+ * 
+ * It follows a dotted notation to represent nested values.
+ * If field names contain dots, can be escaped by wrapping field names 
with backticks.
+ * If field names contain dots at wrapping positions (beginning or end of 
path, before or after dots), then backticks need to be
+ * escaped by backslash.
+ * 
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+private static final String BACKTICK = "`";
+private static final String DOT = ".";
+public static final char BACKTICK_CHAR = '`';
+public static final char DOT_CHAR = '.';
+public static final char BACKSLASH_CHAR = '\\';
+
+private static final Map PATHS_CACHE = new HashMap<>();
+
+private final String[] path;
+
+public static FieldPath ofV1(String field) {
+return of(field, FieldSyntaxVersion.V1);
+}
+
+public static FieldPath ofV2(String field) {
+return of(field, FieldSyntaxVersion.V2);
+}
+
+/**
+ * If version is V2, then paths are cached for further access.
+ *
+ * @param field field path expression
+ * @param version  field syntax version
+ */
+public static FieldPath of(String field, FieldSyntaxVersion version) {
+if (field == null || field.isEmpty() || 
version.equals(FieldSyntaxVersion.V1)) {
+return new FieldPath(field, version);
+} else {
+if (PATHS_CACHE.containsKey(field)) {
+return PATHS_CACHE.get(field);
+} else {
+final FieldPath fieldPath = new FieldPath(field, version);
+PATHS_CACHE.put(field, fieldPath);
+return fieldPath;
+}
+}
+}
+
+FieldPath(String path, FieldSyntaxVersion version) {
+if (path == null || path.isEmpty()) { // empty path
+this.path = new String[] {};
+} else {
+switch (version) {
+case V1: // backward compatibility
+this.path = new String[] {path};
+break;
+case V2:
+// if no dots or wrapping backticks are used, then return 
path with single step
+if (!path.contains(DOT)
+&& !(path.startsWith(BACKTICK) && path.endsWith(
+BACKTICK))) {
+this.path = new String[] {path};
+} else {
+// prepare for tracking path steps
+final List steps = new ArrayList<>();
+final StringBuilder s = new StringBuilder(
+path); // avoid creating new string on changes
+
+while (s.length() > 0) { // until path is traverse
+// process backtick pair if any
+if (s.charAt(0) == BACKTICK_CHAR) {
+s.deleteCharAt(0);
+
+// find backtick pair
+int idx = 0;
+while (idx >= 0) {
+idx = s.indexOf(BACKTICK, idx);
+if (idx == -1) {
+throw new IllegalArgumentExcepti

[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

2022-09-20 Thread GitBox


jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r975537150


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value 
(e.g. Struct or
+ * Map).
+ * 
+ * It follows a dotted notation to represent nested values.
+ * If field names contain dots, can be escaped by wrapping field names 
with backticks.
+ * If field names contain dots at wrapping positions (beginning or end of 
path, before or after dots), then backticks need to be
+ * escaped by backslash.
+ * 
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+private static final String BACKTICK = "`";
+private static final String DOT = ".";
+public static final char BACKTICK_CHAR = '`';
+public static final char DOT_CHAR = '.';
+public static final char BACKSLASH_CHAR = '\\';
+
+private static final Map PATHS_CACHE = new HashMap<>();
+
+private final String[] path;
+
+public static FieldPath ofV1(String field) {
+return of(field, FieldSyntaxVersion.V1);
+}
+
+public static FieldPath ofV2(String field) {
+return of(field, FieldSyntaxVersion.V2);
+}
+
+/**
+ * If version is V2, then paths are cached for further access.
+ *
+ * @param field field path expression
+ * @param version  field syntax version
+ */
+public static FieldPath of(String field, FieldSyntaxVersion version) {
+if (field == null || field.isEmpty() || 
version.equals(FieldSyntaxVersion.V1)) {
+return new FieldPath(field, version);
+} else {
+if (PATHS_CACHE.containsKey(field)) {
+return PATHS_CACHE.get(field);
+} else {
+final FieldPath fieldPath = new FieldPath(field, version);
+PATHS_CACHE.put(field, fieldPath);
+return fieldPath;
+}
+}
+}
+
+FieldPath(String path, FieldSyntaxVersion version) {
+if (path == null || path.isEmpty()) { // empty path
+this.path = new String[] {};
+} else {
+switch (version) {
+case V1: // backward compatibility
+this.path = new String[] {path};
+break;
+case V2:
+// if no dots or wrapping backticks are used, then return 
path with single step
+if (!path.contains(DOT)
+&& !(path.startsWith(BACKTICK) && path.endsWith(
+BACKTICK))) {
+this.path = new String[] {path};
+} else {
+// prepare for tracking path steps
+final List steps = new ArrayList<>();
+final StringBuilder s = new StringBuilder(
+path); // avoid creating new string on changes
+
+while (s.length() > 0) { // until path is traverse
+// process backtick pair if any
+if (s.charAt(0) == BACKTICK_CHAR) {
+s.deleteCharAt(0);
+
+// find backtick pair
+int idx = 0;
+while (idx >= 0) {
+idx = s.indexOf(BACKTICK, idx);
+if (idx == -1) {
+throw new IllegalArgumentExcepti

[GitHub] [kafka] C0urante commented on pull request #12639: KAFKA-14242: do not init managers twice to avoid resource leak

2022-09-20 Thread GitBox


C0urante commented on PR #12639:
URL: https://github.com/apache/kafka/pull/12639#issuecomment-1252547139

   @showuon @ijuma I've taken a stab at a safeguard to prevent tests from 
accidentally terminating the JVM, and to accurately report where those calls 
for termination come from. If you're interested, the PR can be found 
[here](https://github.com/apache/kafka/pull/12666).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12666: KAFKA-14244: Add guard against accidental calls to halt JVM during testing

2022-09-20 Thread GitBox


C0urante commented on code in PR #12666:
URL: https://github.com/apache/kafka/pull/12666#discussion_r975538761


##
build.gradle:
##
@@ -472,7 +475,7 @@ subprojects {
 useJUnitPlatform {
   includeTags "integration"
   includeTags "org.apache.kafka.test.IntegrationTest"
- // Both engines are needed to run JUnit 4 tests alongside JUnit 5 
tests.
+  // Both engines are needed to run JUnit 4 tests alongside JUnit 5 
tests.

Review Comment:
   Unrelated whitespace fix; this line was not aligned with others in my IDE.



##
build.gradle:
##
@@ -520,7 +525,7 @@ subprojects {
 useJUnitPlatform {
   excludeTags "integration"
   excludeTags "org.apache.kafka.test.IntegrationTest"
- // Both engines are needed to run JUnit 4 tests alongside JUnit 5 
tests.
+  // Both engines are needed to run JUnit 4 tests alongside JUnit 5 
tests.

Review Comment:
   Unrelated whitespace fix; this line was not aligned with others in my IDE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12643: KAFKA-14097: make producer ID expiration a dynamic config

2022-09-20 Thread GitBox


dajac commented on code in PR #12643:
URL: https://github.com/apache/kafka/pull/12643#discussion_r975566769


##
core/src/main/scala/kafka/log/ProducerStateManager.scala:
##
@@ -939,3 +940,24 @@ object SnapshotFile {
 SnapshotFile(file, offset)
   }
 }
+
+class ProducerStateManagerConfig(@volatile var producerIdExpirationMs: Int) 
extends Logging with BrokerReconfigurable {
+
+  override def reconfigurableConfigs: Set[String] = 
Set(KafkaConfig.ProducerIdExpirationMsProp)
+
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
+if (producerIdExpirationMs != newConfig.producerIdExpirationMs) {
+  info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from 
$producerIdExpirationMs to ${newConfig.producerIdExpirationMs}")
+  producerIdExpirationMs = newConfig.producerIdExpirationMs
+}
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+if (newConfig.producerIdExpirationMs < 0)
+  throw new ConfigException(s"${KafkaConfig.ProducerIdExpirationMsProp} 
cannot be less than 0, current value is $producerIdExpirationMs")
+  }
+}
+
+object ProducerStateManagerConfig {
+  val ReconfigurableConfigs = Set(KafkaConfig.ProducerIdExpirationMsProp)

Review Comment:
   Is it used anywhere? It does not seem to be.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12643: KAFKA-14097: make producer ID expiration a dynamic config

2022-09-20 Thread GitBox


dajac commented on PR #12643:
URL: https://github.com/apache/kafka/pull/12643#issuecomment-1252583284

   @jolshan Left a small comment. Could you rebase it as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

2022-09-20 Thread GitBox


jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r975569257


##
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathTest.java:
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.junit.jupiter.api.Test;
+
+class FieldPathTest {
+final static String[] EMPTY_PATH = new String[]{};
+
+@Test void shouldBuildV1WithDotsAndBacktickPair() {
+assertArrayEquals(new String[] {"foo.bar.baz"}, 
FieldPath.ofV1("foo.bar.baz").path());
+assertArrayEquals(new String[] {"foo.`bar.baz`"}, 
FieldPath.ofV1("foo.`bar.baz`").path());
+}
+
+@Test void shouldBuildV2WithEmptyPath() {
+assertArrayEquals(EMPTY_PATH, FieldPath.of("", 
FieldSyntaxVersion.V2).path());
+}
+
+@Test void shouldBuildV2WithNullPath() {
+assertArrayEquals(EMPTY_PATH, FieldPath.of(null, 
FieldSyntaxVersion.V2).path());
+}
+
+@Test void shouldBuildV2WithoutDots() {
+assertArrayEquals(new String[] {"foobarbaz"}, 
FieldPath.of("foobarbaz", FieldSyntaxVersion.V2).path());
+}
+@Test void shouldBuildV2WithoutWrappingBackticks() {
+assertArrayEquals(new String[] {"foo`bar`baz"}, 
FieldPath.of("foo`bar`baz", FieldSyntaxVersion.V2).path());
+}
+
+@Test void shouldBuildV2WhenIncludesDots() {
+assertArrayEquals(new String[] {"foo", "bar", "baz"}, 
FieldPath.of("foo.bar.baz", FieldSyntaxVersion.V2).path());
+}
+
+@Test void shouldBuildV2WhenIncludesDotsAndBacktickPair() {
+assertArrayEquals(new String[] {"foo", "bar.baz"}, 
FieldPath.of("foo.`bar.baz`", FieldSyntaxVersion.V2).path());
+assertArrayEquals(new String[] {"foo", "bar", "baz"}, 
FieldPath.of("foo.`bar`.baz", FieldSyntaxVersion.V2).path());
+}
+
+@Test void shouldBuildV2AndIgnoreBackticksThatAreNotWrapping() {
+assertArrayEquals(new String[] {"foo", "ba`r.baz"}, 
FieldPath.of("foo.`ba`r.baz`", FieldSyntaxVersion.V2).path());
+assertArrayEquals(new String[] {"foo", "ba`r", "baz"}, 
FieldPath.of("foo.ba`r.baz", FieldSyntaxVersion.V2).path());
+}
+
+@Test void shouldBuildV2AndEscapeBackticks() {
+assertArrayEquals(new String[] {"foo", "bar`.`baz"}, 
FieldPath.of("foo.`bar\\`.\\`baz`", FieldSyntaxVersion.V2).path());
+assertArrayEquals(new String[] {"foo", "bar\\`.`baz"}, 
FieldPath.of("foo.`bar`.\\`baz`", FieldSyntaxVersion.V2).path());
+}
+
+@Test void shouldBuildV2WithBackticksWrappingBackticks() {

Review Comment:
   Good catch. I will stick to the rules in this case, as there is no need for 
this special case by using backslash to escape backticks.
   Tests are updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12643: KAFKA-14097: make producer ID expiration a dynamic config

2022-09-20 Thread GitBox


jolshan commented on code in PR #12643:
URL: https://github.com/apache/kafka/pull/12643#discussion_r975585267


##
core/src/main/scala/kafka/log/ProducerStateManager.scala:
##
@@ -939,3 +940,24 @@ object SnapshotFile {
 SnapshotFile(file, offset)
   }
 }
+
+class ProducerStateManagerConfig(@volatile var producerIdExpirationMs: Int) 
extends Logging with BrokerReconfigurable {
+
+  override def reconfigurableConfigs: Set[String] = 
Set(KafkaConfig.ProducerIdExpirationMsProp)
+
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
+if (producerIdExpirationMs != newConfig.producerIdExpirationMs) {
+  info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from 
$producerIdExpirationMs to ${newConfig.producerIdExpirationMs}")
+  producerIdExpirationMs = newConfig.producerIdExpirationMs
+}
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+if (newConfig.producerIdExpirationMs < 0)
+  throw new ConfigException(s"${KafkaConfig.ProducerIdExpirationMsProp} 
cannot be less than 0, current value is $producerIdExpirationMs")
+  }
+}
+
+object ProducerStateManagerConfig {
+  val ReconfigurableConfigs = Set(KafkaConfig.ProducerIdExpirationMsProp)

Review Comment:
   This is required to override the class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12643: KAFKA-14097: make producer ID expiration a dynamic config

2022-09-20 Thread GitBox


jolshan commented on code in PR #12643:
URL: https://github.com/apache/kafka/pull/12643#discussion_r975586125


##
core/src/main/scala/kafka/log/ProducerStateManager.scala:
##
@@ -939,3 +940,24 @@ object SnapshotFile {
 SnapshotFile(file, offset)
   }
 }
+
+class ProducerStateManagerConfig(@volatile var producerIdExpirationMs: Int) 
extends Logging with BrokerReconfigurable {
+
+  override def reconfigurableConfigs: Set[String] = 
Set(KafkaConfig.ProducerIdExpirationMsProp)
+
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
+if (producerIdExpirationMs != newConfig.producerIdExpirationMs) {
+  info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from 
$producerIdExpirationMs to ${newConfig.producerIdExpirationMs}")
+  producerIdExpirationMs = newConfig.producerIdExpirationMs
+}
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+if (newConfig.producerIdExpirationMs < 0)
+  throw new ConfigException(s"${KafkaConfig.ProducerIdExpirationMsProp} 
cannot be less than 0, current value is $producerIdExpirationMs")
+  }
+}
+
+object ProducerStateManagerConfig {
+  val ReconfigurableConfigs = Set(KafkaConfig.ProducerIdExpirationMsProp)

Review Comment:
   It is also used in core/src/main/scala/kafka/server/DynamicBrokerConfig.scala



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on pull request #12577: KAFKA-13401: KIP-787 - MM2 manage Kafka resources with custom Admin implementation.

2022-09-20 Thread GitBox


OmniaGM commented on PR #12577:
URL: https://github.com/apache/kafka/pull/12577#issuecomment-1252621680

   @mimaison,  @tombentley can one of you please have a look into this PR? 
Thanks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14243) Temporarily disable unsafe downgrade

2022-09-20 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-14243:
---
Fix Version/s: 3.3.0

> Temporarily disable unsafe downgrade
> 
>
> Key: KAFKA-14243
> URL: https://issues.apache.org/jira/browse/KAFKA-14243
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 3.3.0
>
>
> Temporarily disable unsafe downgrade since we haven't implemented reloading 
> snapshots on unsafe downgrade



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] akhileshchg commented on a diff in pull request #12628: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.

2022-09-20 Thread GitBox


akhileshchg commented on code in PR #12628:
URL: https://github.com/apache/kafka/pull/12628#discussion_r975614289


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -193,15 +194,29 @@ StandardAuthorizerData 
copyWithNewAcls(Collection> aclE
 loadingComplete,
 superUsers,
 defaultRule.result,
-new ConcurrentSkipListSet<>(),
-new ConcurrentHashMap<>());
+new TreeSet<>(),
+new HashMap<>());
 for (Entry entry : aclEntries) {
 newData.addAcl(entry.getKey(), entry.getValue());
 }
 log.info("Applied {} acl(s) from image.", aclEntries.size());
 return newData;
 }
 
+StandardAuthorizerData copyWithNewAcls(TreeSet 
aclsByResource, HashMap

[GitHub] [kafka] guozhangwang opened a new pull request, #12667: MINOR: Improve logging with throwing OOOSE

2022-09-20 Thread GitBox


guozhangwang opened a new pull request, #12667:
URL: https://github.com/apache/kafka/pull/12667

   We need to have the full information on how the seq / epochs are inferred 
with logging.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12667: MINOR: Improve logging with throwing OOOSE

2022-09-20 Thread GitBox


guozhangwang commented on code in PR #12667:
URL: https://github.com/apache/kafka/pull/12667#discussion_r975630163


##
core/src/main/scala/kafka/log/ProducerStateManager.scala:
##
@@ -216,12 +216,12 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
 
   private def checkSequence(producerEpoch: Short, appendFirstSeq: Int, offset: 
Long): Unit = {
 if (producerEpoch != updatedEntry.producerEpoch) {
-  if (appendFirstSeq != 0) {
-if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
-  throw new OutOfOrderSequenceException(s"Invalid sequence number for 
new epoch of producer $producerId " +
-s"at offset $offset in partition $topicPartition: $producerEpoch 
(request epoch), $appendFirstSeq (seq. number), " +
-s"${updatedEntry.producerEpoch} (current producer epoch)")
-}
+  if (appendFirstSeq != 0 && updatedEntry.producerEpoch != 
RecordBatch.NO_PRODUCER_EPOCH) {

Review Comment:
   Consolidating nested if-s



##
core/src/main/scala/kafka/log/ProducerStateManager.scala:
##
@@ -233,10 +233,16 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
 
   // If there is no current producer epoch (possibly because all producer 
records have been deleted due to
   // retention or the DeleteRecords API) accept writes with any sequence 
number
-  if (!(currentEntry.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || 
inSequence(currentLastSeq, appendFirstSeq))) {
+  if (currentEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH && 
!inSequence(currentLastSeq, appendFirstSeq)) {

Review Comment:
   Unfolding the conditions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-09-20 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607329#comment-17607329
 ] 

Guozhang Wang commented on KAFKA-10635:
---

Could you try out this patch https://github.com/apache/kafka/pull/12667 and 
reproduce the issue?

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
> Attachments: logs.csv
>
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)   
>  at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> {code}
> We see a corresponding error on the broker side:
> {code:java}
> [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
> processing append operation on partition 
> topic-name-Aggregation-repartition-52  
> (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
>  Out of order sequence number for producerId 2819098 at offset 1156041 in 
> partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), 
> -1 (current end sequence number)
> {code}
> We are able to reproduce this many times and it happens regardless of whether 
> the broker shutdown (at restart) is clean or unclean. However, when we 
> rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling 
> restarts, we don't see this error on the streams application at all. This is 
> blocking us from upgrading our broker version. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-09-20 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607329#comment-17607329
 ] 

Guozhang Wang edited comment on KAFKA-10635 at 9/20/22 5:20 PM:


Could you try out this patch https://github.com/apache/kafka/pull/12667 and 
reproduce the issue, and collect the logs again?


was (Author: guozhang):
Could you try out this patch https://github.com/apache/kafka/pull/12667 and 
reproduce the issue?

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
> Attachments: logs.csv
>
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)   
>  at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> {code}
> We see a corresponding error on the broker side:
> {code:java}
> [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
> processing append operation on partition 
> topic-name-Aggregation-repartition-52  
> (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
>  Out of order sequence number for producerId 2819098 at offset 1156041 in 
> partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), 
> -1 (current end sequence number)
> {code}
> We are able to reproduce this many times and it happens regardless of whether 
> the broker shutdown (at restart) is clean or unclean. However, when we 
> rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling 
> restarts, we don't see this error on the streams application at all. This is 
> blocking us from 

[GitHub] [kafka] hachikuji commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

2022-09-20 Thread GitBox


hachikuji commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975638603


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;

Review Comment:
   Would it make sense for the handlers to go in `internals/events` as well?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The 
client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with 
type K, and add a request with type T.
+ * @param  Event request type
+ * @param  Event response type
+ */
+public interface EventHandler {

Review Comment:
   I wonder if the generics are needed here. Do we anticipate other types?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRequestEvent.java:
##
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+abstract public class ConsumerRequestEvent {

Review Comment:
   Can you clarify what qualifies as a "request event" vs "response event"? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

2022-09-20 Thread GitBox


philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975641032


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The 
client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with 
type K, and add a request with type T.
+ * @param  Event request type
+ * @param  Event response type
+ */
+public interface EventHandler {

Review Comment:
   That's a good point, I think I was trying to make it a generic handler but 
we don't really need it at this point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

2022-09-20 Thread GitBox


philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975641608


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;

Review Comment:
   Sounds good, I was actually thinking about the same thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

2022-09-20 Thread GitBox


philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975642128


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRequestEvent.java:
##
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+abstract public class ConsumerRequestEvent {

Review Comment:
   I'll add java docs around it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12566: KAFKA-13927: Only clear origOffsets when clearing messageBatch

2022-09-20 Thread GitBox


C0urante commented on code in PR #12566:
URL: https://github.com/apache/kafka/pull/12566#discussion_r975650210


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##
@@ -435,6 +453,13 @@ public void testPollRedelivery() throws Exception {
 assertTaskMetricValue("batch-size-max", 1.0);
 assertTaskMetricValue("batch-size-avg", 0.5);
 
+assertEquals(workerCurrentOffsets, Whitebox.>getInternalState(workerTask, "currentOffsets"));

Review Comment:
   Yeah, I think the assertions you added above cover this case nicely 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #12566: KAFKA-13927: Only clear origOffsets when clearing messageBatch

2022-09-20 Thread GitBox


C0urante merged PR #12566:
URL: https://github.com/apache/kafka/pull/12566


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14245) Topic deleted during reassignment

2022-09-20 Thread Junyang Liu (Jira)
Junyang Liu created KAFKA-14245:
---

 Summary: Topic deleted during reassignment
 Key: KAFKA-14245
 URL: https://issues.apache.org/jira/browse/KAFKA-14245
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.7.1
Reporter: Junyang Liu


When Doing partition reassignment, the partition should not be deleted. But in 
the method "markTopicIneligibleForDeletion", topics need to be in 
"controllerContext.topicsToBeDeleted" while this is usually false because 
topics are not deleted at that time. This makes the topics doing reassignment 
not able to be added to "topicsIneligibleForDeletion". So when topic deletion 
comes, the topic in reassignment can also be deleted, which leads to the result 
that the reassignment can never be finished.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14246) Refactor KafkaConsumer Threading Model

2022-09-20 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14246:
--

 Summary: Refactor KafkaConsumer Threading Model
 Key: KAFKA-14246
 URL: https://issues.apache.org/jira/browse/KAFKA-14246
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


Hi community,

 

We are refactoring the current KafkaConsumer and making it more asynchronous.  
This is the proposal's placeholder; subtasks will be linked to this ticket.  
Please review the design document and feel free to use this thread for 
discussion. 

 

The design document is here: 
[https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor]

 

The original email thread is here: 
https://lists.apache.org/thread/13jvwzkzmb8c6t7drs4oj2kgkjzcn52l

 

I will continue to update the 1pager as reviews and comments come.

 

Thanks, 

P



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14247) Implement EventHandler interface and DefaultEventHandler

2022-09-20 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14247:
--

 Summary: Implement EventHandler interface and DefaultEventHandler
 Key: KAFKA-14247
 URL: https://issues.apache.org/jira/browse/KAFKA-14247
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


The polling thread uses events to communicate with the background thread.  The 
events send to the background thread are the {_}Requests{_}, and the events 
send from the background thread to the polling thread are the {_}Responses{_}.

 

Here we have an EventHandler interface and DefaultEventHandler implementation.  
The implementation uses two blocking queues to send events both ways.  The two 
methods, add and poll allows the client, i.e., the polling thread, to retrieve 
and add events to the handler.

 

PR: https://github.com/apache/kafka/pull/12663



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

2022-09-20 Thread GitBox


C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r975671233


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
 leaderAssignment = deserializeAssignment(result, leaderId);
 assertAssignment(leaderId, offset,
 Collections.emptyList(), 0,
-Collections.emptyList(), 0,
+Collections.emptyList(), 1,

Review Comment:
   👍 Thanks. I wish we could do this without adding an extra boolean flag but 
preventing regressions seems worth the additional complexity, especially since 
there are other issues with the allocation algorithm that need to be worked out 
and, when fixed, will hopefully render the flag unnecessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

2022-09-20 Thread GitBox


C0urante commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r975672702


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
 leaderAssignment = deserializeAssignment(result, leaderId);
 assertAssignment(leaderId, offset,
 Collections.emptyList(), 0,
-Collections.emptyList(), 0,
+Collections.emptyList(), 1,

Review Comment:
   Are there any cases where the introduction of the `canRevoke` flag will 
defeat the purpose of this PR, by causing an imbalanced assignment to be sent 
out with no guaranteed follow-up rebalances (such as those caused by a 
scheduled rebalance delay, or workers rejoining the cluster after receiving an 
assignment with revocations)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14246) Refactor KafkaConsumer Threading Model

2022-09-20 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-14246:
---
Description: 
Hi community,

 

We are refactoring the current KafkaConsumer and making it more asynchronous.  
This is the master Jira to track the project's progress; subtasks will be 
linked to this ticket.  Please review the design document and feel free to use 
this thread for discussion. 

 

The design document is here: 
[https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor]

 

The original email thread is here: 
[https://lists.apache.org/thread/13jvwzkzmb8c6t7drs4oj2kgkjzcn52l]

 

I will continue to update the 1pager as reviews and comments come.

 

Thanks, 

P

  was:
Hi community,

 

We are refactoring the current KafkaConsumer and making it more asynchronous.  
This is the proposal's placeholder; subtasks will be linked to this ticket.  
Please review the design document and feel free to use this thread for 
discussion. 

 

The design document is here: 
[https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor]

 

The original email thread is here: 
https://lists.apache.org/thread/13jvwzkzmb8c6t7drs4oj2kgkjzcn52l

 

I will continue to update the 1pager as reviews and comments come.

 

Thanks, 

P


> Refactor KafkaConsumer Threading Model
> --
>
> Key: KAFKA-14246
> URL: https://issues.apache.org/jira/browse/KAFKA-14246
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>
> Hi community,
>  
> We are refactoring the current KafkaConsumer and making it more asynchronous. 
>  This is the master Jira to track the project's progress; subtasks will be 
> linked to this ticket.  Please review the design document and feel free to 
> use this thread for discussion. 
>  
> The design document is here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor]
>  
> The original email thread is here: 
> [https://lists.apache.org/thread/13jvwzkzmb8c6t7drs4oj2kgkjzcn52l]
>  
> I will continue to update the 1pager as reviews and comments come.
>  
> Thanks, 
> P



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kirktrue commented on pull request #12651: KAFKA-14212: Enhanced HttpAccessTokenRetriever to retrieve error mess…

2022-09-20 Thread GitBox


kirktrue commented on PR #12651:
URL: https://github.com/apache/kafka/pull/12651#issuecomment-1252765578

   Nice work, @smjn! Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji opened a new pull request, #12668: MINOR: Mention deprecation of authorizer flags in security documentation

2022-09-20 Thread GitBox


hachikuji opened a new pull request, #12668:
URL: https://github.com/apache/kafka/pull/12668

   The `--authorizer` and `--authorizer-properties` options are deprecated 
since 3.2. This patch updates the security documentation to mention the 
deprecation and changes examples to use `--bootstrap-server` instead.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

2022-09-20 Thread GitBox


guozhangwang commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975715686


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The 
client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with 
type K, and add a request with type T.
+ * @param  Event request type
+ * @param  Event response type
+ */
+public interface EventHandler {
+public K poll();
+public boolean add(T event);

Review Comment:
   From our offline discussions, I think there are two channels of 
communicating "responses" back: 1) the caller send a request, along with a 
future, and then either sync or async waiting on the future, in this case it 
does not expect "polling" the response; 2) the caller send a request, and then 
either sync or async polling responses.
   
   For commitSync, for example, since the caller only cares about a specific 
response, it may be not appropriate to try polling responses since there will 
be a lot of other responses getting polled but the caller does not care.
   
   Could we add some javadocs clarifying which scenarios will leverage on each 
of the two?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRequestEvent.java:
##
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+abstract public class ConsumerRequestEvent {

Review Comment:
   Per my other comment above: for request, do we also want to always include a 
future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #12664: KAFKA-14243: Temporarily disable unsafe downgrade

2022-09-20 Thread GitBox


mumrah merged PR #12664:
URL: https://github.com/apache/kafka/pull/12664


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

2022-09-20 Thread GitBox


guozhangwang commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975715686


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The 
client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with 
type K, and add a request with type T.
+ * @param  Event request type
+ * @param  Event response type
+ */
+public interface EventHandler {
+public K poll();
+public boolean add(T event);

Review Comment:
   From our offline discussions, I think there are two channels of 
communicating "responses" back: 1) the caller send a request, along with a 
future, and then either sync or async waiting on the future, in this case it 
does not expect "polling" the response; 2) the caller does not have a target 
response to wait in mind, and would just want to poll for any responses that 
may become available and react on them.
   
   For commitSync, for example, since the caller only cares about a specific 
response, it may be not appropriate to try polling responses since there will 
be a lot of other responses getting polled but the caller does not care, hence 
it would likely falling into the first case above.
   
   Could we add some javadocs clarifying which scenarios will leverage on each 
of the two?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #12628: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.

2022-09-20 Thread GitBox


mumrah merged PR #12628:
URL: https://github.com/apache/kafka/pull/12628


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] forlack commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation

2022-09-20 Thread GitBox


forlack commented on code in PR #12642:
URL: https://github.com/apache/kafka/pull/12642#discussion_r972377166


##
docs/ops.html:
##
@@ -3180,6 +3180,119 @@ 6.10 KRaft
+
+  Configuration
+
+  Process Roles
+
+  In KRaft mode each Kafka server can be configured as a controller, as a 
broker or as both using the process.roles property. This property 
can have the following values:
+
+  
+If process.roles is set to broker, the 
server acts as a broker.
+If process.roles is set to controller, the 
server acts as a controller.
+If process.roles is set to 
broker,controller, the server acts as a broker and a 
controller.

Review Comment:
   consider operates instead of acts.  ie the server operates as a controller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] forlack commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation

2022-09-20 Thread GitBox


forlack commented on code in PR #12642:
URL: https://github.com/apache/kafka/pull/12642#discussion_r972393499


##
docs/ops.html:
##
@@ -3180,6 +3180,119 @@ 6.10 KRaft
+
+  Configuration
+
+  Process Roles
+
+  In KRaft mode each Kafka server can be configured as a controller, as a 
broker or as both using the process.roles property. This property 
can have the following values:
+
+  
+If process.roles is set to broker, the 
server acts as a broker.
+If process.roles is set to controller, the 
server acts as a controller.
+If process.roles is set to 
broker,controller, the server acts as a broker and a 
controller.
+If process.roles is not set at all, it is assumed to be 
in ZooKeeper mode.
+  
+
+  Nodes that act as both brokers and controllers are referred to as 
"combined" nodes. Combined nodes are simpler to operate for simple use cases 
like a development environment. The key disadvantage is that the controller 
will be less isolated from the rest of the system. Combined mode is not 
recommended is critical deployment environments.
+
+
+  Controllers
+
+  In KRaft mode, only a small group of specially selected servers can act 
as controllers (unlike the ZooKeeper-based mode, where any server can become 
the Controller). The specially selected controller servers will participate in 
the metadata quorum. Each controller server is either active, or a hot standby 
for the current active controller server.
+
+  A Kafka cluster will typically select 3 or 5 servers for this role, 
depending on factors like cost and the number of concurrent failures your 
system should withstand without availability impact. A majority of the 
controllers must be alive in order to maintain availability. With 3 
controllers, the cluster can tolerate 1 controller failure; with 5 controllers, 
the cluster can tolerate 2 controller failures.
+
+  All of the servers in a Kafka cluster discover the quorum voters using 
the controller.quorum.voters property. This identifies the quorum 
controller servers that should be used. All the controllers must be enumerated. 
Each controller is identified with their id, host and 
port information. This is an example configuration: 
controller.quorum.voters=id1@host1:port1,id2@host2:port2,id3@host3:port3
+
+  If the Kafka cluster has 3 controllers named controller1, controller2 and 
controller3 then controller3 may have the following:
+
+  
+process.roles=controller
+node.id=1
+listeners=CONTROLLER://controller1.example.com:9093
+controller.quorum.voters=1...@controller1.example.com:9093,2...@controller2.example.com:9093,3...@controller3.example.com:9093
+
+  Every broker and controller must set the 
controller.quorum.voters property. The node ID supplied in the 
controller.quorum.voters property must match the corresponding id 
on the controller servers. For example, on controller1, node.id must be set to 
1, and so forth. Each node ID must be unique across all the nodes in a 
particular cluster. No two nodes can have the same node ID regardless of their 
process.roles values.
+
+  Storage Tool
+  
+  The kafka-storage.sh random-uuid command can be used to 
generate a cluster ID for your new cluster. This cluster ID must be used when 
formatting each node in the cluster with the kafka-storage.sh 
format command.
+
+  This is different from how Kafka has operated in the past. Previously, 
Kafka would format blank storage directories automatically, and also generate a 
new cluster ID automatically. One reason for the change is that auto-formatting 
can sometimes obscure an error condition. This is particularly important for 
the metadata log maintained by the controller and broker servers. If a majority 
of the controllers were able to start with an empty log directory, a leader 
might be able to be elected with missing committed data.
+
+  Debugging
+
+  Metadata Quorum 
Tool
+
+  The kafka-metadata-quorum tool can be used to describe the 
runtime state of the cluster metadata partition. For example, the following 
command display a summary of the metadata quorum:
+
+> 
bin/kafka-metadata-quorum.sh --bootstrap-server  broker_host:port describe 
--status
+ClusterId:  fMCL8kv1SWm87L_Md-I2hg
+LeaderId:   3002
+LeaderEpoch:2
+HighWatermark:  10
+MaxFollowerLag: 0
+MaxFollowerLagTimeMs:   -1
+CurrentVoters:  [3000,3001,3002]
+CurrentObservers:   [0,1,2]
+
+  Dump Log Tool
+
+  The kafka-dump-log tool can be used to debug the log 
segments and snapshots for the cluster metadata directory. The tool will scan 
the provided files and decode the metadata records. For example, this command 
decodes and prints the records in the first log segment:
+
+> 
bin/kafka-dump-log.sh --cluster-metadata-decoder --skip-record-metadat --files 
metadata_log_dir/__cluster_metadata-0/.log
+
+  This command decodes and prints the recrods in the a cluster metadata 
snapshot:
+
+> 
bin/kafka-dump-

[GitHub] [kafka] forlack commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation

2022-09-20 Thread GitBox


forlack commented on code in PR #12642:
URL: https://github.com/apache/kafka/pull/12642#discussion_r975807518


##
docs/ops.html:
##
@@ -3180,6 +3180,119 @@ 6.10 KRaft
+
+  Configuration
+
+  Process Roles
+
+  In KRaft mode each Kafka server can be configured as a controller, as a 
broker or as both using the process.roles property. This property 
can have the following values:

Review Comment:
   ```suggestion
 In KRaft mode each Kafka server can be configured as a controller, a 
broker, or both using the process.roles property. This property can 
have the following values:
   ```



##
docs/ops.html:
##
@@ -3180,6 +3180,119 @@ 6.10 KRaft
+
+  Configuration
+
+  Process Roles
+
+  In KRaft mode each Kafka server can be configured as a controller, as a 
broker or as both using the process.roles property. This property 
can have the following values:
+
+  
+If process.roles is set to broker, the 
server acts as a broker.
+If process.roles is set to controller, the 
server acts as a controller.
+If process.roles is set to 
broker,controller, the server acts as a broker and a 
controller.
+If process.roles is not set at all, it is assumed to be 
in ZooKeeper mode.
+  
+
+  Nodes that act as both brokers and controllers are referred to as 
"combined" nodes. Combined nodes are simpler to operate for simple use cases 
like a development environment. The key disadvantage is that the controller 
will be less isolated from the rest of the system. Combined mode is not 
recommended is critical deployment environments.
+
+
+  Controllers
+
+  In KRaft mode, only a small group of specially selected servers can act 
as controllers (unlike the ZooKeeper-based mode, where any server can become 
the Controller). The specially selected controller servers will participate in 
the metadata quorum. Each controller server is either active, or a hot standby 
for the current active controller server.

Review Comment:
   ```suggestion
 In KRaft mode, specific Kafka servers are selected to be controllers 
(unlike the ZooKeeper-based mode, where any server can become the Controller). 
The servers selected to be controllers will participate in the metadata quorum. 
Each controller is either an active or a hot standby for the current active 
controller.
   ```



##
docs/ops.html:
##
@@ -3180,6 +3180,119 @@ 6.10 KRaft
+
+  Configuration
+
+  Process Roles
+
+  In KRaft mode each Kafka server can be configured as a controller, as a 
broker or as both using the process.roles property. This property 
can have the following values:
+
+  
+If process.roles is set to broker, the 
server acts as a broker.
+If process.roles is set to controller, the 
server acts as a controller.
+If process.roles is set to 
broker,controller, the server acts as a broker and a 
controller.

Review Comment:
   ```suggestion
   If process.roles is set to 
broker,controller, the server acts as both a broker and a 
controller.
   ```



##
docs/ops.html:
##
@@ -3180,6 +3180,119 @@ 6.10 KRaft
+
+  Configuration
+
+  Process Roles
+
+  In KRaft mode each Kafka server can be configured as a controller, as a 
broker or as both using the process.roles property. This property 
can have the following values:
+
+  
+If process.roles is set to broker, the 
server acts as a broker.
+If process.roles is set to controller, the 
server acts as a controller.
+If process.roles is set to 
broker,controller, the server acts as a broker and a 
controller.
+If process.roles is not set at all, it is assumed to be 
in ZooKeeper mode.
+  
+
+  Nodes that act as both brokers and controllers are referred to as 
"combined" nodes. Combined nodes are simpler to operate for simple use cases 
like a development environment. The key disadvantage is that the controller 
will be less isolated from the rest of the system. Combined mode is not 
recommended is critical deployment environments.
+
+
+  Controllers
+
+  In KRaft mode, only a small group of specially selected servers can act 
as controllers (unlike the ZooKeeper-based mode, where any server can become 
the Controller). The specially selected controller servers will participate in 
the metadata quorum. Each controller server is either active, or a hot standby 
for the current active controller server.
+
+  A Kafka cluster will typically select 3 or 5 servers for this role, 
depending on factors like cost and the number of concurrent failures your 
system should withstand without availability impact. A majority of the 
controllers must be alive in order to maintain availability. With 3 
controllers, the cluster can tolerate 1 controller failure; with 5 controllers, 
the cluster can tolerate 2 controller failures.

Review Comment:
   ```suggestion
 A Kafka admin will typically select 3 or 5 servers for this role, 
depending on factors like cost and the number of concurrent failures your 
system should withstand wi

[jira] [Resolved] (KAFKA-14214) StandardAuthorizer may transiently process ACLs out of write order

2022-09-20 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-14214.
--
Resolution: Fixed

> StandardAuthorizer may transiently process ACLs out of write order
> --
>
> Key: KAFKA-14214
> URL: https://issues.apache.org/jira/browse/KAFKA-14214
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3
>Reporter: Akhilesh Chaganti
>Assignee: Akhilesh Chaganti
>Priority: Blocker
> Fix For: 3.3.0
>
>
> The issue with StandardAuthorizer#authorize is, that it looks up 
> aclsByResources (which is of type ConcurrentSkipListMap)twice for every 
> authorize call and uses Iterator with weak consistency guarantees on top of 
> aclsByResources. This can cause the authorize function call to process the 
> concurrent writes out of order.
> *Issue 1:*
> When StandardAuthorizer calls into a simple authorize function, we check the 
> ACLs for literal/prefix matches for the resource and then make one more call 
> to check the ACLs for matching wildcard entries. Between the two 
> (checkSection) calls, let’s assume we add a DENY for resource literal and add 
> an ALLOW ALL wildcard. The first call to check literal/prefix rules will SKIP 
> DENY ACL since the writes are not yet processed and the second call would 
> find ALLOW wildcard entry which results in ALLOW authorization for the 
> resource when it is actually DENY.
> *Issue: 2*
> For authorization, StandardAuthorizer depends on an iterator that iterates 
> through the ordered set of ACLs. The iterator has weak consistency 
> guarantees. So when writes for two ACLs occur, one of the ACLs might be still 
> visible to the iterator while the other is not. 
> Let’s say below two ACLS are getting added in the following order to the set.
> Acl1 = StandardAcl(TOPIC, foobar, LITERAL, DENY, READ, user1)
> Acl2 = StandardAcl(TOPIC, foo, PREFIX, ALLOW, READ, user1)
> Depending on the position of the iterator on the ordered set during the write 
> call, the iterator might just see Acl2 which prompts it to ALLOW the topic to 
> be READ even though the DENY rule was written before.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14243) Temporarily disable unsafe downgrade

2022-09-20 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-14243.
--
Resolution: Fixed

> Temporarily disable unsafe downgrade
> 
>
> Key: KAFKA-14243
> URL: https://issues.apache.org/jira/browse/KAFKA-14243
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 3.3.0
>
>
> Temporarily disable unsafe downgrade since we haven't implemented reloading 
> snapshots on unsafe downgrade



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

2022-09-20 Thread GitBox


philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975863343


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The 
client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with 
type K, and add a request with type T.
+ * @param  Event request type
+ * @param  Event response type
+ */
+public interface EventHandler {
+public K poll();
+public boolean add(T event);

Review Comment:
   Absolutely.  I'm also thinking that the naming can be a bit misleading 
sometimes, because the ResponseEvents might not necessary be a "response".  As 
far as the implementation goes, I think they are mostly just errors and 
callback events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

2022-09-20 Thread GitBox


philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975863614


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRequestEvent.java:
##
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+abstract public class ConsumerRequestEvent {

Review Comment:
   Probably not, events like assignment don't really require a response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

2022-09-20 Thread GitBox


hachikuji commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975893945


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The 
client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with 
type K, and add a request with type T.
+ * @param  Event request type
+ * @param  Event response type
+ */
+public interface EventHandler {
+public K poll();
+public boolean add(T event);

Review Comment:
   I was thinking something like RequestEvent -> ApplicationEvent and 
ResponseEvent -> NetworkEvent. Or something like that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

2022-09-20 Thread GitBox


philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975897905


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The 
client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with 
type K, and add a request with type T.
+ * @param  Event request type
+ * @param  Event response type
+ */
+public interface EventHandler {
+public K poll();
+public boolean add(T event);

Review Comment:
Can we be more direct and call the ResponseEvent the 
BackgroundThreadEvent... because it really is coming from the background 
thread, and it's not necessary a NetworkEvent.  It can be just a trigger for 
the callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

2022-09-20 Thread GitBox


hachikuji commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975915821


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The 
client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with 
type K, and add a request with type T.
+ * @param  Event request type
+ * @param  Event response type
+ */
+public interface EventHandler {
+public K poll();
+public boolean add(T event);

Review Comment:
   That could work. Maybe `ApplicationEvent` and `BackgroundEvent`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2022-09-20 Thread GitBox


showuon commented on PR #12590:
URL: https://github.com/apache/kafka/pull/12590#issuecomment-1253101672

   @divijvaidya , you're right, thanks for the update! Sorry, I only checked 
the broker side implementation that although we created 
`SessionlessFetchContext`, it still return the fetched records. I missed the 
part that we sent the fetch request with "empty" fetchData. So, we're good! 
Thanks again for the explanation!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

2022-09-20 Thread GitBox


guozhangwang commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975965989


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The 
client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with 
type K, and add a request with type T.
+ * @param  Event request type
+ * @param  Event response type
+ */
+public interface EventHandler {
+public K poll();
+public boolean add(T event);

Review Comment:
   I was thinking about `ConsumerEvent` or `CallerEvent`, and 
`CoordinatorEvent`, just to make the naming party worse :P 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #12665: MINOR: Fix typo in info message

2022-09-20 Thread GitBox


showuon merged PR #12665:
URL: https://github.com/apache/kafka/pull/12665


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12666: KAFKA-14244: Add guard against accidental calls to halt JVM during testing

2022-09-20 Thread GitBox


showuon commented on PR #12666:
URL: https://github.com/apache/kafka/pull/12666#issuecomment-1253120024

   @C0urante , interesting solution. But I'm wondering how you test it and 
prove this can really catch `Exit` and report error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12667: KAFKA-10635: Improve logging with throwing OOOSE

2022-09-20 Thread GitBox


showuon commented on code in PR #12667:
URL: https://github.com/apache/kafka/pull/12667#discussion_r975975334


##
core/src/main/scala/kafka/log/ProducerStateManager.scala:
##
@@ -233,10 +233,16 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
 
   // If there is no current producer epoch (possibly because all producer 
records have been deleted due to
   // retention or the DeleteRecords API) accept writes with any sequence 
number
-  if (!(currentEntry.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || 
inSequence(currentLastSeq, appendFirstSeq))) {
+  if (currentEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH && 
!inSequence(currentLastSeq, appendFirstSeq)) {
 throw new OutOfOrderSequenceException(s"Out of order sequence number 
for producer $producerId at " +
-  s"offset $offset in partition $topicPartition: $appendFirstSeq 
(incoming seq. number), " +
-  s"$currentLastSeq (current end sequence number)")
+  s"offset $offset in partition $topicPartition: producer request 
epoch $producerEpoch, " +
+  s"current entry epoch ${currentEntry.producerEpoch}, " +
+  s"updating entry epoch ${updatedEntry.producerEpoch}, " +
+  s"append first seq $appendFirstSeq, " +
+  s"updating entry last seq ${updatedEntry.lastSeq}, " +
+  s"updating batch empty ${updatedEntry.isEmpty}" +

Review Comment:
   nit: updating entry last seq will be `-1` if `updatedEntry.isEmpty`. Is this 
line still necessary? Or you think it's possible this value might be changed by 
other thread?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

2022-09-20 Thread GitBox


vamossagar12 commented on code in PR #12561:
URL: https://github.com/apache/kafka/pull/12561#discussion_r975980445


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java:
##
@@ -517,13 +517,13 @@ public void testTaskAssignmentWhenWorkerBounces() {
 leaderAssignment = deserializeAssignment(result, leaderId);
 assertAssignment(leaderId, offset,
 Collections.emptyList(), 0,
-Collections.emptyList(), 0,
+Collections.emptyList(), 1,

Review Comment:
   TBH, even I didn't want to re-introduce the flag back but seemed the easiest 
way to get around the regression. I guess, as you said it might be easier to 
work through other issues on the allocation algorithm to finally have the flag 
redundant.
   
   Regarding the side-effects of the re-introduction of this flag, I had 
imagined that adding the flag back would break some of the tests but that 
didn't happen which may or mayn't be a good thing. I did look at the logic 
again and compared with the original algorithm it seemed to me that this line:
   
   
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L310
   
   is the line that prevented successive revoking rebalances. The other check 
here: 
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L312
 
   
   gives us a window to set it to true and force a revocation in the next round 
which kind of made me believe that it should be a safe check. That said, if 
there are scenarios where we think we need testing, I would be happy to do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12668: MINOR: Mention deprecation of authorizer flags in security documentation

2022-09-20 Thread GitBox


showuon commented on code in PR #12668:
URL: https://github.com/apache/kafka/pull/12668#discussion_r975997743


##
docs/security.html:
##
@@ -1364,6 +1352,18 @@ 
 Convenience
 
+
+--authorizer
+(DEPRECATED: not supported in KRaft) Fully qualified class 
name of the authorizer.
+kafka.security.authorizer.AclAuthorizer
+Configuration
+
+
+--authorizer-properties
+(DEPRECATED: not supported in KRaft) key=val pairs that will 
be passed to authorizer for initialization. For the default authorizer in ZK 
clsuters, the example values are: zookeeper.connect=localhost:2181
+
+Configuration
+
 
 --zk-tls-config-file
  Identifies the file where ZooKeeper client TLS connectivity 
properties for the authorizer are defined.

Review Comment:
   Should we also update `zk-tls-config-file` for deprecation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14063) Kafka message parsing can cause ooms with small antagonistic payloads

2022-09-20 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-14063:
--
Fix Version/s: 3.3.0

> Kafka message parsing can cause ooms with small antagonistic payloads
> -
>
> Key: KAFKA-14063
> URL: https://issues.apache.org/jira/browse/KAFKA-14063
> Project: Kafka
>  Issue Type: Bug
>  Components: generator
>Affects Versions: 3.2.0
>Reporter: Daniel Collins
>Priority: Major
> Fix For: 2.8.2, 3.3.0, 3.0.2, 3.1.2, 3.2.3
>
>
> When parsing code receives a payload for a variable length field where the 
> length is specified in the code as some arbitrarily large number (assume 
> INT32_MAX for example) this will immediately try to allocate an ArrayList to 
> hold this many elements, before checking whether this is a reasonable array 
> size given the available data. 
> The fix for this is to instead throw a runtime exception if the length of a 
> variably sized container exceeds the amount of remaining data. Then, the 
> worst a user can do is force the server to allocate 8x the size of the actual 
> delivered data (if they claim there are N elements for a container of Objects 
> (i.e. not a byte string) and each Object bottoms out in an 8 byte pointer in 
> the ArrayList's backing array).
> This was identified by fuzzing the kafka request parsing code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)