[GitHub] [kafka] dengziming commented on a diff in pull request #12265: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating
dengziming commented on code in PR #12265: URL: https://github.com/apache/kafka/pull/12265#discussion_r915514842 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala: ## @@ -45,26 +45,37 @@ class BrokerMetadataSnapshotter( */ private var _currentSnapshotOffset = -1L + /** + * The offset of the newest snapshot, or -1 if there hasn't been one. Accessed only under + * the object lock. + */ + private var _latestSnapshotOffset = -1L + /** * The event queue which runs this listener. */ val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse("")) override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean = synchronized { -if (_currentSnapshotOffset == -1L) { +if (_currentSnapshotOffset != -1) { + warn(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " + Review Comment: I agree with you here, info level is more suitable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12337: KAFKA-10199: Remove main consumer from store changelog reader
guozhangwang commented on PR #12337: URL: https://github.com/apache/kafka/pull/12337#issuecomment-1177071757 > @guozhangwang Didn’t we decide to use “ requireStable” in the KIP? I must have reverted that commit by mistake when I tried to revert the scripting related code, I will file a hotfix commit to fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log
showuon commented on PR #12274: URL: https://github.com/apache/kafka/pull/12274#issuecomment-1176986634 > I think this will unfence the broker at startup even if the broker hasn't applied the snapshot or any of the log records, right? Currently, we will replay the metadata records when metadata listener got new records. So yes, if we just return the current LEO, the records/snapshots might have not applied, yet. Sorry, it's easy to reject other's proposal, but difficult to come up another solution. If we don't have any other better solution, maybe we can try the original proposed one? ``` One solution to this problem is to require the broker to only catch up to the last committed offset when they last sent the heartbeat. For example: Broker sends a heartbeat with current offset of Y. The last commit offset is X. The controller remember this last commit offset, call it X' Broker sends another heartbeat with current offset of Z. Unfence the broker if Z >= X or Z >= X'. ``` And again, thanks for keeping trying to fix this difficult issue, @dengziming ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #12337: KAFKA-10199: Remove main consumer from store changelog reader
dajac commented on PR #12337: URL: https://github.com/apache/kafka/pull/12337#issuecomment-1176984813 @guozhangwang Didn’t we decide to use “ requireStable” in the KIP? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12337: KAFKA-10199: Remove main consumer from store changelog reader
dajac commented on code in PR #12337: URL: https://github.com/apache/kafka/pull/12337#discussion_r915401645 ## clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java: ## @@ -44,10 +45,21 @@ public ListConsumerGroupOffsetsOptions topicPartitions(List topi return this; } +/** + * Sets an optional requireStable flag. + */ +public void requireStable(final boolean requireStable) { +this.requireStable = requireStable; +} + /** * Returns a list of topic partitions to add as part of the result. */ public List topicPartitions() { return topicPartitions; } + +public boolean shouldRequireStable() { Review Comment: Didn’t we settle on using requireStable in the KIP? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically
showuon commented on code in PR #12296: URL: https://github.com/apache/kafka/pull/12296#discussion_r915386298 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -1854,6 +1853,33 @@ class LogCleanerTest { } finally logCleaner.shutdown() } + @Test + def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = { +val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") +oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000) + +val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)), + logDirs = Array(TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time) { + // shutdown() and startup() are called in LogCleaner.reconfigure(). + // Empty startup() and shutdown() to ensure that no unnecessary log cleaner threads remain after this test. + override def startup(): Unit = {} + override def shutdown(): Unit = {} +} + +try { + assertEquals(1000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be initialized from initial `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.") + + val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") + newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 2000) + + logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps)) + + assertEquals(2000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be updated with new `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.") +} finally logCleaner.shutdown(); Review Comment: nit: (1) no semicolon is needed (2) the format in Kafka is usually like this: ``` finally { logCleaner.shutdown() } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #12188: KAFKA-10892: Shared Readonly State Stores
mjsax commented on code in PR #12188: URL: https://github.com/apache/kafka/pull/12188#discussion_r915376733 ## streams/src/main/java/org/apache/kafka/streams/Topology.java: ## @@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder storeBuilder, return this; } +/** + * Adds a Read Only {@link StateStore} to the topology. + * Review Comment: nit: do we need a `` tag here? ## streams/src/main/java/org/apache/kafka/streams/Topology.java: ## @@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder storeBuilder, return this; } +/** + * Adds a Read Only {@link StateStore} to the topology. Review Comment: nit: `read-only` ## streams/src/main/java/org/apache/kafka/streams/Topology.java: ## @@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder storeBuilder, return this; } +/** + * Adds a Read Only {@link StateStore} to the topology. + * + * A Read Only StateStore can use any compacted topic as a changelog. Review Comment: Proposal: ``` A read-only state store uses its input topic for fault-tolerance. Thus, in contrast to regular state stores, it must never create an internal changelog topic. Therefore, the input topic should be configured with log compaction. ``` ## streams/src/main/java/org/apache/kafka/streams/Topology.java: ## @@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder storeBuilder, return this; } +/** + * Adds a Read Only {@link StateStore} to the topology. + * + * A Read Only StateStore can use any compacted topic as a changelog. + * + * A {@link SourceNode} will be added to consume the data arriving from the partitions of the input topic. + * + * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all + * records forwarded from the {@link SourceNode}. + * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. + * + * @param storeBuilder user defined key value store builder Review Comment: If we are limited to kv-store, should we change the type to `StoreBuilder` (or similar)? ## streams/src/main/java/org/apache/kafka/streams/Topology.java: ## @@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder storeBuilder, return this; } +/** + * Adds a Read Only {@link StateStore} to the topology. + * + * A Read Only StateStore can use any compacted topic as a changelog. + * + * A {@link SourceNode} will be added to consume the data arriving from the partitions of the input topic. + * + * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all + * records forwarded from the {@link SourceNode}. + * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. + * + * @param storeBuilder user defined key value store builder + * @param sourceNamename of the {@link SourceNode} that will be automatically added + * @param timestampExtractorthe stateless timestamp extractor used for this source, + * if not specified the default extractor defined in the configs will be used + * @param keyDeserializer the {@link Deserializer} to deserialize keys with + * @param valueDeserializer the {@link Deserializer} to deserialize values with + * @param topic the topic to source the data from + * @param processorName the name of the {@link ProcessorSupplier} + * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} + * @return itself + * @throws TopologyException if the processor of state is already registered + */ +public synchronized Topology addReadOnlyStateStore(final StoreBuilder storeBuilder, + final String sourceName, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String topic, + final String processorName, + final ProcessorSupplier stateUpdateSupplier) { +if (storeBuilder.loggingEnabled()) { +// -- disabling logging. We might want to print some logging. +
[GitHub] [kafka] guozhangwang opened a new pull request, #12387: KAFKA-10199: Add RESUME in task updater
guozhangwang opened a new pull request, #12387: URL: https://github.com/apache/kafka/pull/12387 This should be reviewed after https://github.com/apache/kafka/pull/12386. 1) Need to check `enforceRestoreActive` / `transitToUpdateStandby` when resuming a paused task. 2) Do not expose another `getResumedTasks` since I think its caller only need the `getPausedTasks`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request, #12386: KAFKA-10199: Add PAUSE in task updater
guozhangwang opened a new pull request, #12386: URL: https://github.com/apache/kafka/pull/12386 1. Add pause action to task-updater. 2. When removing a task, also check in the paused tasks in addition to removed tasks. 3. Also I realized we do not check if tasks with the same id are added, so I add that check in this PR as well. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window
mjsax commented on code in PR #12370: URL: https://github.com/apache/kafka/pull/12370#discussion_r915364409 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java: ## @@ -289,7 +289,10 @@ private StoreBuilder> materialize(final MaterializedInt // do not enable cache if the emit final strategy is used if (materialized.cachingEnabled() && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { builder.withCachingEnabled(); +} else { +builder.withCachingDisabled(); Review Comment: Why do we need this call? I thought we only add tests in this PR? Is the feature not completed yet and we need this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12338: KAFKA-10199: Cleanup TaskManager and Task interfaces
guozhangwang commented on code in PR #12338: URL: https://github.com/apache/kafka/pull/12338#discussion_r915368085 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ## @@ -337,7 +369,7 @@ void addToSuccessfullyProcessed(final Task task) { successfullyProcessed.add(task); } -void removeTaskFromCuccessfullyProcessedBeforeClosing(final Task task) { +void removeTaskFromSuccessfullyProcessedBeforeClosing(final Task task) { Review Comment: Fixed a typo in function name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12338: KAFKA-10199: Cleanup TaskManager and Task interfaces
guozhangwang commented on code in PR #12338: URL: https://github.com/apache/kafka/pull/12338#discussion_r915355254 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ## @@ -83,7 +83,6 @@ class DefaultStateUpdaterTest { private final Time time = new MockTime(1L); private final StreamsConfig config = new StreamsConfig(configProps()); private final ChangelogReader changelogReader = mock(ChangelogReader.class); -private final java.util.function.Consumer> offsetResetter = topicPartitions -> { }; Review Comment: This is a leftover from previous commit, we do not need this anymore. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ## @@ -34,29 +33,41 @@ import java.util.TreeMap; import java.util.stream.Collectors; +import static org.apache.kafka.common.utils.Utils.filterMap; +import static org.apache.kafka.common.utils.Utils.union; + +/** + * All tasks contained by the Streams instance. + * + * Note that these tasks are shared between the TaskManager (stream thread) and the StateUpdater (restore thread), + * i.e. all running active tasks are processed by the former and all restoring active tasks and standby tasks are + * processed by the latter. + */ class Tasks { private final Logger log; private final TopologyMetadata topologyMetadata; -private final Map allTasksPerId = Collections.synchronizedSortedMap(new TreeMap<>()); -private final Map readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId); -private final Collection readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values()); - // TODO: change type to `StreamTask` private final Map activeTasksPerId = new TreeMap<>(); +// TODO: change type to `StandbyTask` +private final Map standbyTasksPerId = new TreeMap<>(); + +// Tasks may have been assigned for a NamedTopology that is not yet known by this host. When that occurs we stash +// these unknown tasks until either the corresponding NamedTopology is added and we can create them at last, or +// we receive a new assignment and they are revoked from the thread. + +// Tasks may have been assigned but not yet created because: +// 1. They are for a NamedTopology that is yet known by this host. +// 2. They are to be recycled from an existing restoring task yet to be returned from the task updater. +// +// When that occurs we stash these pending tasks until either they are finally clear to be created, +// or they are revoked from a new assignment. +private final Map> pendingActiveTasks = new HashMap<>(); Review Comment: This is mainly for case 1) in the description. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java: ## @@ -182,7 +159,7 @@ public Collection createTasks(final Consumer consumer, partitions ); -final InternalProcessorContext context = new ProcessorContextImpl( +final InternalProcessorContext context = new ProcessorContextImpl( Review Comment: Minor cleanup. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -285,34 +286,46 @@ public void handleAssignment(final Map> activeTasks, final LinkedHashMap taskCloseExceptions = new LinkedHashMap<>(); final Map> activeTasksToCreate = new HashMap<>(activeTasks); final Map> standbyTasksToCreate = new HashMap<>(standbyTasks); +final Map> tasksToRecycle = new HashMap<>(); final Comparator byId = Comparator.comparing(Task::id); -final Set tasksToRecycle = new TreeSet<>(byId); final Set tasksToCloseClean = new TreeSet<>(byId); final Set tasksToCloseDirty = new TreeSet<>(byId); -// first rectify all existing tasks +tasks.purgePendingTasks(activeTasks.keySet(), standbyTasks.keySet()); + +// first rectify all existing tasks: +// 1. for tasks that are already owned, just resume and skip re-creating them +// 2. for tasks that have changed active/standby status, just recycle and skip re-creating them +// 3. otherwise, close them since they are no longer owned. for (final Task task : tasks.allTasks()) { -if (activeTasks.containsKey(task.id()) && task.isActive()) { -tasks.updateInputPartitionsAndResume(task, activeTasks.get(task.id())); -activeTasksToCreate.remove(task.id()); -} else if (standbyTasks.containsKey(task.id()) && !task.isActive()) { -tasks.updateInputPartitionsAndResume(task, standbyTasks.get(task.id())); -standbyTasksToCreate.remove(task.id()); -} else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) { -// check for tasks that were owned pre
[jira] [Commented] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join
[ https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563510#comment-17563510 ] Matthias J. Sax commented on KAFKA-14049: - {quote}Null Values in the Stream for a Left Join would indicate a Tombstone Message {quote} This does not make sense to me. A KStream is an event/fact stream and thus there are not delete/tombstone semantics. Only _changelogs_ have tombstone semantics, but changelogs are modeled as `KTables` in Kafka Streams. I still tend to agree that the current semantics are not ideal, but it's hard to fix without introducing ambiguity (and the reason to drop them, is to avoid ambiguity). I guess a current workaround would be, to implement the join using a `transform()` (or similar) – should not be too complicated. > Relax Non Null Requirement for KStreamGlobalKTable Left Join > > > Key: KAFKA-14049 > URL: https://issues.apache.org/jira/browse/KAFKA-14049 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Saumya Gupta >Priority: Major > Labels: beginner, newbie > > Null Values in the Stream for a Left Join would indicate a Tombstone Message > that needs to propagated if not actually joined with the GlobalKTable > message, hence these messages should not be ignored . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join
[ https://issues.apache.org/jira/browse/KAFKA-14049 ] Matthias J. Sax deleted comment on KAFKA-14049: - was (Author: mjsax): Sounds like a duplicate to https://issues.apache.org/jira/browse/KAFKA-12317 ? > Relax Non Null Requirement for KStreamGlobalKTable Left Join > > > Key: KAFKA-14049 > URL: https://issues.apache.org/jira/browse/KAFKA-14049 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Saumya Gupta >Priority: Major > Labels: beginner, newbie > > Null Values in the Stream for a Left Join would indicate a Tombstone Message > that needs to propagated if not actually joined with the GlobalKTable > message, hence these messages should not be ignored . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join
[ https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563509#comment-17563509 ] Matthias J. Sax commented on KAFKA-14049: - Sounds like a duplicate to https://issues.apache.org/jira/browse/KAFKA-12317 ? > Relax Non Null Requirement for KStreamGlobalKTable Left Join > > > Key: KAFKA-14049 > URL: https://issues.apache.org/jira/browse/KAFKA-14049 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Saumya Gupta >Priority: Major > Labels: beginner, newbie > > Null Values in the Stream for a Left Join would indicate a Tombstone Message > that needs to propagated if not actually joined with the GlobalKTable > message, hence these messages should not be ignored . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure
C0urante commented on code in PR #12320: URL: https://github.com/apache/kafka/pull/12320#discussion_r915350266 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java: ## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.ws.rs.core.Response; +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.niceMock; +import static org.easymock.EasyMock.replay; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class RestClientTest { + +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); +private static final TypeReference TEST_TYPE = new TypeReference() { +}; +private HttpClient httpClient; + +private static String toJsonString(Object obj) { +try { +return OBJECT_MAPPER.writeValueAsString(obj); +} catch (JsonProcessingException e) { +throw new RuntimeException(e); +} +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient, TypeReference typeReference) { +return RestClient.httpRequest( +httpClient, null, null, null, null, typeReference, null, null); +} + +@BeforeEach +public void mockSetup() { +httpClient = niceMock(HttpClient.class); +} + +@Test +public void testSuccess() throws ExecutionException, InterruptedException, TimeoutException { +int statusCode = Response.Status.OK.getStatusCode(); +String expectedResponse = toJsonString(new TestDTO("someContent")); +setupHttpClient(statusCode, expectedResponse); + +RestClient.HttpResponse httpResp = httpRequest(httpClient, TEST_TYPE); +assertEquals(httpResp.status(), statusCode); +assertEquals(toJsonString(httpResp.body()), expectedResponse); +} + +@Test +public void testNoContent() throws ExecutionException, InterruptedException, TimeoutException { +int statusCode = Response.Status.NO_CONTENT.getStatusCode(); +setupHttpClient(statusCode, null); + +RestClient.HttpResponse httpResp = httpRequest(httpClient, TEST_TYPE); +assertEquals(httpResp.status(), statusCode); +assertNull(httpResp.body()); +} + +@Test +public void testError() throws ExecutionException, InterruptedException, TimeoutException { Review Comment: I think we should hold off on using Junit 5 for now; these changes are fine as-are. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure
C0urante commented on code in PR #12320: URL: https://github.com/apache/kafka/pull/12320#discussion_r915347260 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java: ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.crypto.SecretKey; +import javax.ws.rs.core.Response; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(Enclosed.class) +public class RestClientTest { + +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); +private static final TypeReference TEST_TYPE = new TypeReference() { +}; +private static final SecretKey MOCK_SECRET_KEY = getMockSecretKey(); + +private static void assertIsInternalServerError(ConnectRestException e) { +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.errorCode()); +} + +private static SecretKey getMockSecretKey() { +SecretKey mockKey = mock(SecretKey.class); +when(mockKey.getFormat()).thenReturn("RAW");// supported format by + when(mockKey.getEncoded()).thenReturn("SomeKey".getBytes(StandardCharsets.UTF_8)); +return mockKey; +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient, String requestSignatureAlgorithm) { +return RestClient.httpRequest( +httpClient, +"https://localhost:1234/api/endpoint";, +"GET", +null, +new TestDTO("requestBodyData"), +TEST_TYPE, +MOCK_SECRET_KEY, +requestSignatureAlgorithm); +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient) { +String validRequestSignatureAlgorithm = "HmacSHA1"; +return httpRequest(httpClient, validRequestSignatureAlgorithm); +} + + +@RunWith(Parameterized.class) +public static class RequestFailureParameterizedTest { +private final HttpClient httpClient = mock(HttpClient.class); + +@Parameterized.Parameter +public Throwable requestException; + +@Parameterized.Parameters +public static Collection requestExceptions() { +return Arrays.asList(new Object[][]{ +{new InterruptedException()}, +{new ExecutionException(null)}, +{new TimeoutException()} +}); +} + +private static Request buildThrowingMockRequest(Throwable t) throws ExecutionException, InterruptedException, TimeoutException { +Request req = mock(Request.class); +when(req.send()).thenThrow(t); +return req; +} + +@Test +public void testFailureDuringRequestCausesInternalServerError() thro
[GitHub] [kafka] guozhangwang merged pull request #12337: KAFKA-10199: Remove main consumer from store changelog reader
guozhangwang merged PR #12337: URL: https://github.com/apache/kafka/pull/12337 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stan-confluent commented on a diff in pull request #12120: Add mini test
stan-confluent commented on code in PR #12120: URL: https://github.com/apache/kafka/pull/12120#discussion_r915331214 ## tests/kafkatest/tests/core/mini_test.py: ## @@ -0,0 +1,18 @@ +from ducktape.tests.test import Test +from ducktape.mark.resource import cluster + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService + + +class MiniTest(Test): +def __init__(self, test_context): +super(MiniTest, self).__init__(test_context=test_context) + +self.zk = ZookeeperService(test_context, 1) +self.kafka = KafkaService(test_context, 1, self.zk) + +@cluster(num_nodes=2) +def test(self): +self.zk.start() Review Comment: Yeah, sorry, this slipped my radar. It's a good idea, I'll add it - but how can we push this PR to older versions if it has kraft? Would it have to be a separate PR for older versions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12265: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating
hachikuji commented on code in PR #12265: URL: https://github.com/apache/kafka/pull/12265#discussion_r915323373 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala: ## @@ -45,26 +45,37 @@ class BrokerMetadataSnapshotter( */ private var _currentSnapshotOffset = -1L + /** + * The offset of the newest snapshot, or -1 if there hasn't been one. Accessed only under + * the object lock. + */ + private var _latestSnapshotOffset = -1L Review Comment: Do we need to initialize this on startup? If we don't, is it possible that we would end up snapshotting the same offset more than once? ## core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala: ## @@ -45,26 +45,37 @@ class BrokerMetadataSnapshotter( */ private var _currentSnapshotOffset = -1L + /** + * The offset of the newest snapshot, or -1 if there hasn't been one. Accessed only under + * the object lock. + */ + private var _latestSnapshotOffset = -1L + /** * The event queue which runs this listener. */ val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse("")) override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean = synchronized { -if (_currentSnapshotOffset == -1L) { +if (_currentSnapshotOffset != -1) { + warn(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " + Review Comment: Do you think the log level is right for these messages? It seems like these are "normal" things that can happen and not an indication of a problem. Perhaps we can make them info or debug level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] methodmissing opened a new pull request, #12385: Expose client information on RequestContext as additional public API beyond request logs (continuation of KIP 511)
methodmissing opened a new pull request, #12385: URL: https://github.com/apache/kafka/pull/12385 [KIP 511](https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers) introduced a [ClientInformation](https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java) class that wraps software (client) name and version and is also set as a property on [RequestContext](https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java), except unfortunately there's no getter to retrieve this information. The [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers) implemented protocol support, a registry to set it at the network layer per session and integrated with [RequestContext](https://github.com/apache/kafka/blob/d35283f011a797902fc9c4d896a1a6f039eb7d06/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java#L101), but unfortunately the only "public API" for this information is the broker request logs. This change exposes client information to custom authorisers as well via `RequestConext`, where it can be programatically used in a pluggable fashion as well. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14052) Download verification directions are incorrect for linux
[ https://issues.apache.org/jira/browse/KAFKA-14052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] M Sesterhenn updated KAFKA-14052: - Description: [https://www.apache.org/info/verification.html] The above is linked to from the kafka download page ([https://kafka.apache.org/downloads]), and it contains incorrect instructions for verifying the release. The .sha512 files for the downloads are all in this format: {code:java} kafka_2.13-3.2.0.tgz: 736A1298 23B058DC 10788D08 93BDE47B 6F39B9E4 972F9EAC 2D5C9E85 E51E4773 44C6F1E1 EBD126CE 34D5FD43 0EB07E55 FDD60D60 CB541F1D 48655C0E BC0A4778 {code} These files cannot be used to easily verify the expected hash using the procedure described in the verification website. The website says to use: {code:java} sha512sum file {code} ...which doesn't do any hash comparison; it only tells you what the file's hash is, and it is up to the user to manually compare its output with the differently formatted output in the .sha512 file, which is error-prone and a chore. Expected result: I would expect to be able to do {code:java} sha512sum -c file{code} ...like any normal download. If the format of the .sha512 files cannot be changed to be compatible with the linux shasum program, then please update the website to describe the proper way to compare hashes. The best way seems to be a script like this: {code:java} SHA=$(mktemp); gpg --print-md SHA512 $FILE > $SHA && diff $SHA $FILE.sha512 && echo "SHA checks out OK." {code} (where FILE is the downloaded tarball.) I looked into providing a PR for the verification page, but that is an Apache-wide web page and probably is not publicly available. was: [https://www.apache.org/info/verification.html] The above is linked to from the kafka download page ([https://kafka.apache.org/downloads]), and it contains incorrect instructions for verifying the release. The .sha512 files for the downloads are all in this format: {code:java} kafka_2.13-3.2.0.tgz: 736A1298 23B058DC 10788D08 93BDE47B 6F39B9E4 972F9EAC 2D5C9E85 E51E4773 44C6F1E1 EBD126CE 34D5FD43 0EB07E55 FDD60D60 CB541F1D 48655C0E BC0A4778 {code} These files cannot be used to easily verify the expected hash using the procedure described in the verification website. The website says to use: {code:java} sha512sum file {code} ...which doesn't do any hash comparison; it only tells you what the file's hash is, and it is up to the user to manually compare its output with the differently formatted output in the .sha512 file, which is error-prone and a chore. Expected result: I would expect to be able to do {code:java} sha512sum -c file{code} ...like any normal download. If the format of the .sha512 files cannot be changed to be compatible with the linux shasum program, then please update the website to describe the proper way to compare hashes. The best way seems to be a script like this: {code:java} SHA=$(mktemp); gpg --print-md SHA512 $FILE > $SHA && diff $SHA $FILE.sha512 && echo "SHA checks out OK." {code} (where FILE is the downloaded tarball.) I looked into providing a PR for the verification page, but that is an Apache-wide web page and probably is not publicly available. > Download verification directions are incorrect for linux > > > Key: KAFKA-14052 > URL: https://issues.apache.org/jira/browse/KAFKA-14052 > Project: Kafka > Issue Type: Bug > Components: documentation > Environment: website >Reporter: M Sesterhenn >Priority: Major > > [https://www.apache.org/info/verification.html] > The above is linked to from the kafka download page > ([https://kafka.apache.org/downloads]), and it contains incorrect > instructions for verifying the release. > The .sha512 files for the downloads are all in this format: > {code:java} > kafka_2.13-3.2.0.tgz: 736A1298 23B058DC 10788D08 93BDE47B 6F39B9E4 972F9EAC > 2D5C9E85 E51E4773 44C6F1E1 EBD126CE 34D5FD43 0EB07E55 FDD60D60 CB541F1D > 48655C0E BC0A4778 > {code} > These files cannot be used to easily verify the expected hash using the > procedure described in the verification website. The website says to use: > {code:java} > sha512sum file {code} > ...which doesn't do any hash comparison; it only tells you what the file's > hash is, and it is up to the user to manually compare its output with the > differently formatted output in the .sha512 file, which is error-prone and a > chore. > Expected result: > I would expect to be able to do > {code:java} > sha512sum -c file{code} > ...like any normal download. > If the format of the .sha512 files cannot be changed to be compatible with > the linux shasum program, then please update the website to describe the > proper way to compare hashes. The best way seems to be a script like this: > {code:java} > SHA=$(mktemp); gpg
[jira] [Created] (KAFKA-14052) Download verification directions are incorrect for linux
M Sesterhenn created KAFKA-14052: Summary: Download verification directions are incorrect for linux Key: KAFKA-14052 URL: https://issues.apache.org/jira/browse/KAFKA-14052 Project: Kafka Issue Type: Bug Components: documentation Environment: website Reporter: M Sesterhenn [https://www.apache.org/info/verification.html] The above is linked to from the kafka download page ([https://kafka.apache.org/downloads]), and it contains incorrect instructions for verifying the release. The .sha512 files for the downloads are all in this format: {code:java} kafka_2.13-3.2.0.tgz: 736A1298 23B058DC 10788D08 93BDE47B 6F39B9E4 972F9EAC 2D5C9E85 E51E4773 44C6F1E1 EBD126CE 34D5FD43 0EB07E55 FDD60D60 CB541F1D 48655C0E BC0A4778 {code} These files cannot be used to easily verify the expected hash using the procedure described in the verification website. The website says to use: {code:java} sha512sum file {code} ...which doesn't do any hash comparison; it only tells you what the file's hash is, and it is up to the user to manually compare its output with the differently formatted output in the .sha512 file, which is error-prone and a chore. Expected result: I would expect to be able to do {code:java} sha512sum -c file{code} ...like any normal download. If the format of the .sha512 files cannot be changed to be compatible with the linux shasum program, then please update the website to describe the proper way to compare hashes. The best way seems to be a script like this: {code:java} SHA=$(mktemp); gpg --print-md SHA512 $FILE > $SHA && diff $SHA $FILE.sha512 && echo "SHA checks out OK." {code} (where FILE is the downloaded tarball.) I looked into providing a PR for the verification page, but that is an Apache-wide web page and probably is not publicly available. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14051) KRaft remote controllers do not create metrics reporters
[ https://issues.apache.org/jira/browse/KAFKA-14051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino reassigned KAFKA-14051: - Assignee: Ron Dagostino > KRaft remote controllers do not create metrics reporters > > > Key: KAFKA-14051 > URL: https://issues.apache.org/jira/browse/KAFKA-14051 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Major > > KRaft remote controllers (KRaft nodes with the configuration value > process.roles=controller) do not create the configured metrics reporters > defined by the configuration key metric.reporters. The reason is because > KRaft remote controllers are not wired up for dynamic config changes, and the > creation of the configured metric reporters actually happens during the > wiring up of the broker for dynamic reconfiguration, in the invocation of > DynamicBrokerConfig.addReconfigurables(KafkaBroker). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14051) KRaft remote controllers do not create metrics reporters
Ron Dagostino created KAFKA-14051: - Summary: KRaft remote controllers do not create metrics reporters Key: KAFKA-14051 URL: https://issues.apache.org/jira/browse/KAFKA-14051 Project: Kafka Issue Type: Bug Components: kraft Affects Versions: 3.3 Reporter: Ron Dagostino KRaft remote controllers (KRaft nodes with the configuration value process.roles=controller) do not create the configured metrics reporters defined by the configuration key metric.reporters. The reason is because KRaft remote controllers are not wired up for dynamic config changes, and the creation of the configured metric reporters actually happens during the wiring up of the broker for dynamic reconfiguration, in the invocation of DynamicBrokerConfig.addReconfigurables(KafkaBroker). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14032) Dequeue time for forwarded requests is ignored to set
[ https://issues.apache.org/jira/browse/KAFKA-14032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-14032. - Fix Version/s: 3.3.0 Resolution: Fixed > Dequeue time for forwarded requests is ignored to set > - > > Key: KAFKA-14032 > URL: https://issues.apache.org/jira/browse/KAFKA-14032 > Project: Kafka > Issue Type: Bug >Reporter: Feiyan Yu >Priority: Minor > Fix For: 3.3.0 > > > It seems like `requestDequeueTimeNanos` is ignored to set. > As a property of a `Request object`, `requestDequeueTimeNanos` is set only > when handlers manage to poll and handle this request from `requestQueue`, > however, handlers only poll the request from envelop request once, but calls > handle method twice, which lead to an ignorance of `requestDequeueTimeNanos` > for parsed forwarded requests. > The parsed envelop requests have `requestDequeueTimeNanos` = -1, and it > affect the correctness of statistics and metrics of `LocalTimeMs`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji merged pull request #12360: KAFKA-14032: Dequeue time for forwarded requests is ignored to set
hachikuji merged PR #12360: URL: https://github.com/apache/kafka/pull/12360 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch
[ https://issues.apache.org/jira/browse/KAFKA-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563415#comment-17563415 ] Jason Gustafson commented on KAFKA-14050: - This might be a false alarm. We may have been running a client with a version in between releases. In 2.7.0, the field has the right int64 type: [https://github.com/apache/kafka/blob/2.7.0/clients/src/main/resources/common/message/ApiVersionsResponse.json.] In 2.6, the field does not exist: [https://github.com/apache/kafka/blob/2.6/clients/src/main/resources/common/message/ApiVersionsResponse.json.] > Older clients cannot deserialize ApiVersions response with finalized feature > epoch > -- > > Key: KAFKA-14050 > URL: https://issues.apache.org/jira/browse/KAFKA-14050 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 3.3.0 > > > When testing kraft locally, we encountered this exception from an older > client: > {code:java} > [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | > adminclient-1394] org.apache.kafka.common.utils.KafkaThread > lambda$configureThread$0 - Uncaught exception in thread > 'kafka-admin-client-thread | adminclient-1394': > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'api_keys': Error reading array of size 1207959552, only 579 bytes available > at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118) > at > org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378) > at > org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187) > at > org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333) > at > org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260) > at java.base/java.lang.Thread.run(Thread.java:832) {code} > The cause appears to be from a change to the type of the > `FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to > int64: > [https://github.com/apache/kafka/pull/9001/files#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bR58.] > Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this > by creating a new field. We will have to leave the existing tag in the > protocol spec and consider it dead. > Credit for this find goes to [~dajac] . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch
[ https://issues.apache.org/jira/browse/KAFKA-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14050: Description: When testing kraft locally, we encountered this exception from an older client: {code:java} [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1394': org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'api_keys': Error reading array of size 1207959552, only 579 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118) at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378) at org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187) at org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260) at java.base/java.lang.Thread.run(Thread.java:832) {code} The cause appears to be from a change to the type of the `FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to int64: [https://github.com/apache/kafka/pull/9001/files#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bR58.] Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by creating a new field. We will have to leave the existing tag in the protocol spec and consider it dead. Credit for this find goes to [~dajac] . was: When testing kraft locally, we encountered this exception from an older client: {code:java} [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1394': org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'api_keys': Error reading array of size 1207959552, only 579 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118) at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378) at org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187) at org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260) at java.base/java.lang.Thread.run(Thread.java:832) {code} The cause appears to be from a change to the type of the `FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to int64: Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by creating a new field. We will have to leave the existing tag in the protocol spec and consider it dead. Credit for this find goes to [~dajac] . > Older clients cannot deserialize ApiVersions response with finalized feature > epoch > -- > > Key: KAFKA-14050 > URL: https://issues.apache.org/jira/browse/KAFKA-14050 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 3.3.0 > > > When testing kraft locally, we encountered this exception from an older > client: > {code:java} > [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | > adminclient-1394] org.apache.kafka.common.utils.KafkaThread > lambda$configureThread$0 - Uncaught exception in thread > 'kafka-admin-client-thread | adminclient-1394': > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'api_keys': Error reading array of size 1207959552, only 579 bytes available > at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118) > at > org
[jira] [Updated] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch
[ https://issues.apache.org/jira/browse/KAFKA-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14050: Description: When testing kraft locally, we encountered this exception from an older client: {code:java} [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1394': org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'api_keys': Error reading array of size 1207959552, only 579 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118) at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378) at org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187) at org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260) at java.base/java.lang.Thread.run(Thread.java:832) {code} The cause appears to be from a change to the type of the `FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to int64: [https://github.com/apache/kafka/pull/9001/files#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bR58.] Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by creating a new field. We will have to leave the existing tag in the protocol spec and consider it dead. Credit for this find goes to [~dajac] . was: When testing kraft locally, we encountered this exception from an older client: {code:java} [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1394': org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'api_keys': Error reading array of size 1207959552, only 579 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118) at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378) at org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187) at org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260) at java.base/java.lang.Thread.run(Thread.java:832) {code} The cause appears to be from a change to the type of the `FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to int64: [https://github.com/apache/kafka/pull/9001/files#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bR58.] Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by creating a new field. We will have to leave the existing tag in the protocol spec and consider it dead. Credit for this find goes to [~dajac] . > Older clients cannot deserialize ApiVersions response with finalized feature > epoch > -- > > Key: KAFKA-14050 > URL: https://issues.apache.org/jira/browse/KAFKA-14050 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 3.3.0 > > > When testing kraft locally, we encountered this exception from an older > client: > {code:java} > [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | > adminclient-1394] org.apache.kafka.common.utils.KafkaThread > lambda$configureThread$0 - Uncaught exception in thread > 'kafka-admin-client-thread | adminclient-1394': > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'api_keys': Error reading array of size 1207959552,
[jira] [Updated] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch
[ https://issues.apache.org/jira/browse/KAFKA-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14050: Description: When testing kraft locally, we encountered this exception from an older client: {code:java} [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1394': org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'api_keys': Error reading array of size 1207959552, only 579 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118) at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378) at org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187) at org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260) at java.base/java.lang.Thread.run(Thread.java:832) {code} The cause appears to be from a change to the type of the `FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to int64: Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by creating a new field. We will have to leave the existing tag in the protocol spec and consider it dead. Credit for this find goes to [~dajac] . was: When testing kraft locally, we encountered this exception from an older client: {code:java} [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1394': org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'api_keys': Error reading array of size 1207959552, only 579 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118) at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378) at org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187) at org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260) at java.base/java.lang.Thread.run(Thread.java:832) {code} The cause appears to be from a change to the type of the `FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to int64: [https://github.com/confluentinc/ce-kafka/commit/fb4f297207ef62f71e4a6d2d0dac75752933043d#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bL58.] Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by creating a new field. We will have to leave the existing tag in the protocol spec and consider it dead. Credit for this find goes to [~dajac] . > Older clients cannot deserialize ApiVersions response with finalized feature > epoch > -- > > Key: KAFKA-14050 > URL: https://issues.apache.org/jira/browse/KAFKA-14050 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 3.3.0 > > > When testing kraft locally, we encountered this exception from an older > client: > {code:java} > [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | > adminclient-1394] org.apache.kafka.common.utils.KafkaThread > lambda$configureThread$0 - Uncaught exception in thread > 'kafka-admin-client-thread | adminclient-1394': > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'api_keys': Error reading array of size 1207959552, only 579 bytes available > at org.apache.kafka.common.protocol.types.Schema.
[jira] [Updated] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch
[ https://issues.apache.org/jira/browse/KAFKA-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14050: Description: When testing kraft locally, we encountered this exception from an older client: {code:java} [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1394': org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'api_keys': Error reading array of size 1207959552, only 579 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118) at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378) at org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187) at org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260) at java.base/java.lang.Thread.run(Thread.java:832) {code} The cause appears to be from a change to the type of the `FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to int64: [https://github.com/confluentinc/ce-kafka/commit/fb4f297207ef62f71e4a6d2d0dac75752933043d#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bL58.] Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by creating a new field. We will have to leave the existing tag in the protocol spec and consider it dead. Credit for this find goes to [~dajac] . was: When testing kraft locally, we encountered this exception from an older client: {code:java} [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught exception in thread 'kafka-admin-client-thread | admi nclient-1394': org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'api_keys': Error reading array of size 1207959552, only 579 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118) at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378) at org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187) at org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260) at java.base/java.lang.Thread.run(Thread.java:832) {code} The cause appears to be from a change to the type of the `FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to int64: [https://github.com/confluentinc/ce-kafka/commit/fb4f297207ef62f71e4a6d2d0dac75752933043d#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bL58.] Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by creating a new field. We will have to leave the existing tag in the protocol spec and consider it dead. Credit for this find goes to [~dajac] . > Older clients cannot deserialize ApiVersions response with finalized feature > epoch > -- > > Key: KAFKA-14050 > URL: https://issues.apache.org/jira/browse/KAFKA-14050 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 3.3.0 > > > When testing kraft locally, we encountered this exception from an older > client: > {code:java} > [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | > adminclient-1394] org.apache.kafka.common.utils.KafkaThread > lambda$configureThread$0 - Uncaught exception in thread > 'kafka-admin-client-thread | adminclient-1394': > org.apache.kafka.common.protocol.types.SchemaEx
[jira] [Created] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch
Jason Gustafson created KAFKA-14050: --- Summary: Older clients cannot deserialize ApiVersions response with finalized feature epoch Key: KAFKA-14050 URL: https://issues.apache.org/jira/browse/KAFKA-14050 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Fix For: 3.3.0 When testing kraft locally, we encountered this exception from an older client: {code:java} [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught exception in thread 'kafka-admin-client-thread | admi nclient-1394': org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'api_keys': Error reading array of size 1207959552, only 579 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118) at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378) at org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187) at org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260) at java.base/java.lang.Thread.run(Thread.java:832) {code} The cause appears to be from a change to the type of the `FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to int64: [https://github.com/confluentinc/ce-kafka/commit/fb4f297207ef62f71e4a6d2d0dac75752933043d#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bL58.] Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by creating a new field. We will have to leave the existing tag in the protocol spec and consider it dead. Credit for this find goes to [~dajac] . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jsancio commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log
jsancio commented on PR #12274: URL: https://github.com/apache/kafka/pull/12274#issuecomment-1176530495 > In this PR I tried your suggestion and it does solve this problem, however, this will make the logic in RaftClient very complex and we need to save more states in LeaderState and it's also difficult to test @dengziming Do you have a diff for this solution? I am interested in this solution as it would work in both REMOTE and COLOCATED configuration for KRaft. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log
jsancio commented on PR #12274: URL: https://github.com/apache/kafka/pull/12274#issuecomment-1176512896 > > here is the code change: https://github.com/dengziming/kafka/tree/KAFKA-13959-2 > > Hey @dengziming, I took at look at the commits in this tree. Is this the only commit [dengziming@79dc8ec](https://github.com/dengziming/kafka/commit/79dc8ec423cd74fba462e934f89bdec3dcd8528d)? Can you maybe share a diff/compare. For example, something like [dengziming/kafka@30216ea...KAFKA-13959-2](https://github.com/dengziming/kafka/compare/30216ea1c58761e62f51af40033f24e3ae1c5c2a...KAFKA-13959-2) Never mind. I understand now. The broker sends the active controller the local LEO instead of the last applied offset by the broker listener. I think this will unfence the broker at startup even if the broker hasn't applied the snapshot or any of the log records, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log
jsancio commented on PR #12274: URL: https://github.com/apache/kafka/pull/12274#issuecomment-1176510158 > here is the code change: https://github.com/dengziming/kafka/tree/KAFKA-13959-2 Hey @dengziming, I took at look at the commits in this tree. Is this the only commit https://github.com/dengziming/kafka/commit/79dc8ec423cd74fba462e934f89bdec3dcd8528d? Can you maybe share a diff/compare. For example, something like https://github.com/dengziming/kafka/compare/30216ea1c58761e62f51af40033f24e3ae1c5c2a...KAFKA-13959-2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lihaosky commented on pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock
lihaosky commented on PR #12166: URL: https://github.com/apache/kafka/pull/12166#issuecomment-1176495887 I can also take a look by end of this week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #12383: REVERT: Kip-770
mjsax commented on code in PR #12383: URL: https://github.com/apache/kafka/pull/12383#discussion_r915090310 ## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ## @@ -1256,44 +1255,6 @@ public void shouldThrowExceptionWhenClientTagValueExceedMaxLimit() { ); } -@Test -@SuppressWarnings("deprecation") -public void shouldUseStateStoreCacheMaxBytesWhenBothOldAndNewConfigsAreSet() { -props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100); -props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10); -final StreamsConfig config = new StreamsConfig(props); -assertEquals(getTotalCacheSize(config), 100); -} - -@Test -@SuppressWarnings("deprecation") -public void shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() { -props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10); -final StreamsConfig config = new StreamsConfig(props); -assertEquals(getTotalCacheSize(config), 10); -} - -@Test -public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() { -props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10); -final StreamsConfig config = new StreamsConfig(props); -assertEquals(getTotalCacheSize(config), 10); -} - -@Test -public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() { -final StreamsConfig config = new StreamsConfig(props); -assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024); -} - -@Test -public void testInvalidSecurityProtocol() { -props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc"); -final ConfigException ce = assertThrows(ConfigException.class, -() -> new StreamsConfig(props)); - assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); -} - Review Comment: Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #12383: REVERT: Kip-770
mjsax commented on code in PR #12383: URL: https://github.com/apache/kafka/pull/12383#discussion_r915088428 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java: ## @@ -55,7 +55,6 @@ public class RecordQueue { private final Sensor droppedRecordsSensor; private final Sensor consumedSensor; -private long totalBytesBuffered; private long headRecordSizeInBytes; Review Comment: I did keep this one, because there was a follow up PR that depends on it, and I did not want to revert (or change) the other one. https://github.com/apache/kafka/commit/a6c5a74fdbdce9a992b47706913c920902cda28c -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] MPeli commented on pull request #12331: KAFKA-1194: changes needed to run on Windows
MPeli commented on PR #12331: URL: https://github.com/apache/kafka/pull/12331#issuecomment-1176373885 @divijvaidya, thank you for you help. JDK 17 and Scala 2.13 build and tests finished successfully. Can we now request can a committer for review on this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #12309: KAFKA-14007: Invoking connect headers.close method on shutdown
vamossagar12 commented on PR #12309: URL: https://github.com/apache/kafka/pull/12309#issuecomment-1176357937 @showuon , can you plz review the changes? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #12321: KAFKA-14012: Adding null checks for cases when closeQuietly was being passed a lambda object
vamossagar12 commented on PR #12321: URL: https://github.com/apache/kafka/pull/12321#issuecomment-1176357390 @showuon , could you plz review these changes whenever you get the chance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12383: REVERT: Kip-770
vamossagar12 commented on code in PR #12383: URL: https://github.com/apache/kafka/pull/12383#discussion_r914702189 ## streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java: ## @@ -137,54 +129,19 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize); } else { -// If the user hasn't explicitly set the buffered.records.per.partition config, then leave it unbounded -// and rely on the input.buffer.max.bytes instead to keep the memory usage under control -maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) -? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1; +maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); } -final boolean stateStoreCacheMaxBytesOverridden = isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides); -final boolean cacheMaxBytesBufferingOverridden = isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides); - -if (!stateStoreCacheMaxBytesOverridden && !cacheMaxBytesBufferingOverridden) { -cacheSize = getTotalCacheSize(globalAppConfigs); +if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { +cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); +log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize); } else { -if (stateStoreCacheMaxBytesOverridden && cacheMaxBytesBufferingOverridden) { -cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); -log.info("Topology {} is using both deprecated config {} and new config {}, hence {} is ignored and the new config {} (value {}) is used", - topologyName, - CACHE_MAX_BYTES_BUFFERING_CONFIG, - STATESTORE_CACHE_MAX_BYTES_CONFIG, - CACHE_MAX_BYTES_BUFFERING_CONFIG, - STATESTORE_CACHE_MAX_BYTES_CONFIG, - cacheSize); -} else if (cacheMaxBytesBufferingOverridden) { -cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); -log.info("Topology {} is using only deprecated config {}, and will be used to set cache size to {}; " + - "we suggest setting the new config {} instead as deprecated {} would be removed in the future.", - topologyName, - CACHE_MAX_BYTES_BUFFERING_CONFIG, - cacheSize, - STATESTORE_CACHE_MAX_BYTES_CONFIG, - CACHE_MAX_BYTES_BUFFERING_CONFIG); -} else { -cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); -} - -if (cacheSize != 0) { -log.warn("Topology {} is overriding cache size to {} but this will not have any effect as the " - + "topology-level cache size config only controls whether record buffering is enabled " - + "or disabled, thus the only valid override value is 0", - topologyName, cacheSize); -} else { -log.info("Topology {} is overriding cache size to {}, record buffering will be disabled", - topologyName, cacheSize); -} +cacheSize = globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); } if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) { maxTaskIdleMs = getLong(MAX_TASK_IDLE_MS_CONFIG); -log.info("Topology {} is overriding {} to {}", topologyName, MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs); +log.info("Topology {} is overridding {} to {}", topologyName, MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs); Review Comment: nit: typo in overriding.. I think that's how it was originally. Comment can be ignored :) ## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java: ## @@ -55,7 +55,6 @@ public class RecordQueue { private final Sensor droppedRecordsSensor; private final Sensor consumedSensor; -private long totalBytesBuffered; private long headRecordSizeInBytes; Review Comment: I think headRecordSizeInBytes was also added as part of the PR which should be removed: https://github.com/apache/kafka/pull/11796/files#diff-2c19d764cad8fcbe7da8046cf0a01e525bc41a5e12e08e8c71d76c0f27ffc550R56 ## streams/src/te
[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure
elkkhan commented on code in PR #12320: URL: https://github.com/apache/kafka/pull/12320#discussion_r914831552 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java: ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.jose4j.keys.HmacKey; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.ws.rs.core.Response; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(Enclosed.class) +public class RestClientTest { + +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); +private static final TypeReference TEST_TYPE = new TypeReference() { +}; + +private static void assertIsInternalServerError(ConnectRestException e) { +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.errorCode()); +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient, String requestSignatureAlgorithm) { +return RestClient.httpRequest( +httpClient, +"https://localhost:1234/api/endpoint";, +"GET", +null, +new TestDTO("requestBodyData"), +TEST_TYPE, +new HmacKey("HMAC".getBytes(StandardCharsets.UTF_8)), +requestSignatureAlgorithm); +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient) { +String validRequestSignatureAlgorithm = "HmacMD5"; +return httpRequest(httpClient, validRequestSignatureAlgorithm); +} + + +@RunWith(Parameterized.class) +public static class RequestFailureParameterizedTest { +private static final HttpClient httpClient = mock(HttpClient.class); Review Comment: `@Mock` can't be used since it needs a `MockitoJUnitRunner` - we already have the `Parameterized` runner here and we can't have multiple runners on a class. Good point about the `static` mock, I missed that. `Any static field saves its state for the duration of the JVM's execution (unless code changes its value, of course). JUnit uses one JVM for all of its tests, so, yes, static fields save state between tests.` Changing the field to non-static -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #12380: MINOR: Get rid of agent checks in Jenkinsfile
mumrah commented on PR #12380: URL: https://github.com/apache/kafka/pull/12380#issuecomment-1176265122 We can probably do without these agent checks. The idea behind them was to fail-fast when the node provider wasn't available (rather than waiting for the 2hr timeout). But, the end result is pretty much the same (aborted build status). I wonder if we can set up a different Jenkins job that runs on trunk only for ARM and PPC. I doubt we have gained much from running these builds on each PR. Seems they mainly add noise to our build status. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anekee666 commented on pull request #6329: KAFKA-1194: Fix renaming open files on Windows
anekee666 commented on PR #6329: URL: https://github.com/apache/kafka/pull/6329#issuecomment-1176258274 Hi, it seems a leak in the kafka log segments area, after the changes: ![image-2022-06-28-14-41-43-339](https://user-images.githubusercontent.com/98586737/177567524-c4fecb96-d84b-4683-bac3-347a44da5523.png) The memory increase over time, but i still did find the reason any idea ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] elkkhan commented on pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure
elkkhan commented on PR #12320: URL: https://github.com/apache/kafka/pull/12320#issuecomment-1176251858 @C0urante addressed the comments, I think the only outstanding one is whether we should add junit-jupiter-params to dep. list and use that for parameterised tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure
elkkhan commented on code in PR #12320: URL: https://github.com/apache/kafka/pull/12320#discussion_r914862459 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java: ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.jose4j.keys.HmacKey; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.ws.rs.core.Response; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(Enclosed.class) +public class RestClientTest { + +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); +private static final TypeReference TEST_TYPE = new TypeReference() { +}; + +private static void assertIsInternalServerError(ConnectRestException e) { +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.errorCode()); +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient, String requestSignatureAlgorithm) { +return RestClient.httpRequest( +httpClient, +"https://localhost:1234/api/endpoint";, +"GET", +null, +new TestDTO("requestBodyData"), +TEST_TYPE, +new HmacKey("HMAC".getBytes(StandardCharsets.UTF_8)), Review Comment: replaced with a mock key -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure
elkkhan commented on code in PR #12320: URL: https://github.com/apache/kafka/pull/12320#discussion_r914831552 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java: ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.jose4j.keys.HmacKey; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.ws.rs.core.Response; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(Enclosed.class) +public class RestClientTest { + +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); +private static final TypeReference TEST_TYPE = new TypeReference() { +}; + +private static void assertIsInternalServerError(ConnectRestException e) { +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.errorCode()); +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient, String requestSignatureAlgorithm) { +return RestClient.httpRequest( +httpClient, +"https://localhost:1234/api/endpoint";, +"GET", +null, +new TestDTO("requestBodyData"), +TEST_TYPE, +new HmacKey("HMAC".getBytes(StandardCharsets.UTF_8)), +requestSignatureAlgorithm); +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient) { +String validRequestSignatureAlgorithm = "HmacMD5"; +return httpRequest(httpClient, validRequestSignatureAlgorithm); +} + + +@RunWith(Parameterized.class) +public static class RequestFailureParameterizedTest { +private static final HttpClient httpClient = mock(HttpClient.class); Review Comment: `@Mock` can't be used since it needs a `MockitoJUnitRunner`, but we already have the `Parameterized` runner here and we can't have multiple runners on a class. Good point about the `static` mock, I missed that. `Any static field saves its state for the duration of the JVM's execution (unless code changes its value, of course). JUnit uses one JVM for all of its tests, so, yes, static fields save state between tests.` Changing the field to non-static -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure
elkkhan commented on code in PR #12320: URL: https://github.com/apache/kafka/pull/12320#discussion_r914821168 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java: ## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.ws.rs.core.Response; +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.niceMock; +import static org.easymock.EasyMock.replay; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class RestClientTest { + +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); +private static final TypeReference TEST_TYPE = new TypeReference() { +}; +private HttpClient httpClient; + +private static String toJsonString(Object obj) { +try { +return OBJECT_MAPPER.writeValueAsString(obj); +} catch (JsonProcessingException e) { +throw new RuntimeException(e); +} +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient, TypeReference typeReference) { +return RestClient.httpRequest( +httpClient, null, null, null, null, typeReference, null, null); +} + +@BeforeEach +public void mockSetup() { +httpClient = niceMock(HttpClient.class); +} + +@Test +public void testSuccess() throws ExecutionException, InterruptedException, TimeoutException { +int statusCode = Response.Status.OK.getStatusCode(); +String expectedResponse = toJsonString(new TestDTO("someContent")); +setupHttpClient(statusCode, expectedResponse); + +RestClient.HttpResponse httpResp = httpRequest(httpClient, TEST_TYPE); +assertEquals(httpResp.status(), statusCode); +assertEquals(toJsonString(httpResp.body()), expectedResponse); +} + +@Test +public void testNoContent() throws ExecutionException, InterruptedException, TimeoutException { +int statusCode = Response.Status.NO_CONTENT.getStatusCode(); +setupHttpClient(statusCode, null); + +RestClient.HttpResponse httpResp = httpRequest(httpClient, TEST_TYPE); +assertEquals(httpResp.status(), statusCode); +assertNull(httpResp.body()); +} + +@Test +public void testError() throws ExecutionException, InterruptedException, TimeoutException { Review Comment: @C0urante would it be too much overhead to add junit-jupiter-params to classpath? if not, I'd be happy to refactor this to use JUnit 5 parameterized tests, otherwise I think it can be left as is -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12381: KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection
divijvaidya commented on code in PR #12381: URL: https://github.com/apache/kafka/pull/12381#discussion_r914730050 ## core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala: ## @@ -429,6 +430,20 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup verifyProduceConsume(producer, consumer, 10, topic) } +def verifyBrokerToControllerCall(controller: KafkaServer): Unit = { + val nonControllerBroker = servers.find(_.config.brokerId != controller.config.brokerId).get + val brokerToControllerManager = nonControllerBroker.clientToControllerChannelManager + val completionHandler = new TestControllerRequestCompletionHandler() + brokerToControllerManager.sendRequest(new MetadataRequest.Builder(new MetadataRequestData()), completionHandler) + TestUtils.waitUntilTrue(() => { +completionHandler.completed.get() || completionHandler.timedOut.get() + }, "Timed out while waiting for broker to controller API call") + val response = completionHandler.actualResponse.getOrElse(throw new IllegalStateException("No response recorded even though request is completed")) Review Comment: Both your comments make sense. I have make the changes as per your suggestions. ## core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala: ## @@ -189,6 +188,10 @@ class BrokerToControllerChannelManagerImpl( config.saslInterBrokerHandshakeRequestEnable, logContext ) + channelBuilder match { +case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable) +case _ => Review Comment: removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563162#comment-17563162 ] Matthew de Detrich edited comment on KAFKA-14014 at 7/6/22 11:27 AM: - So I think I have gotten somewhere. I tried adjusting various timeouts to no avail so I moved onto another theory which is that some topology/stream based methods are asynchronous, i.e they are doing something in the background even after the calling function has finished. To test this theory I have adjusted the flaky test by adding some manual Thread.sleep's to account for this supposed synchronicity, i.e. {code:java} final AddNamedTopologyResult result1 = streams.addNamedTopology(topology2Client1); Thread.sleep(500); streams2.addNamedTopology(topology2Client2).all().get(); Thread.sleep(500); result1.all().get(); Thread.sleep(500);{code} This appears to be greatly reducing the flakiness. Originally I used 200 millis which roughly increased the amount of time until failure by 4x however I still got a failure so now I am testing it with the 500 millis as shown above. was (Author: mdedetrich-aiven): So I think I have gotten somewhere. I tried adjusting various timeouts to no avail so I moved onto another theory which is that some topology/stream based methods are asynchronous, i.e they are doing something in the background even after the calling function has finished. To test this theory I have adjusted the flaky test by adding some manual sleeps, i.e. {code:java} final AddNamedTopologyResult result1 = streams.addNamedTopology(topology2Client1); Thread.sleep(500); streams2.addNamedTopology(topology2Client2).all().get(); Thread.sleep(500); result1.all().get(); Thread.sleep(500);{code} This appears to be greatly reducing the flakiness. Originally I used 200 millis which roughly increased the amount of time until failure by 4x however I still got a failure so now I am testing it with the 500 millis as shown above. > Flaky test > NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() > > > Key: KAFKA-14014 > URL: https://issues.apache.org/jira/browse/KAFKA-14014 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bruno Cadonna >Priority: Critical > Labels: flaky-test > > {code:java} > java.lang.AssertionError: > Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]> > but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:833) > {code} > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldA
[jira] [Comment Edited] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563004#comment-17563004 ] Matthew de Detrich edited comment on KAFKA-14014 at 7/6/22 11:24 AM: - [~cadonna] So I did some debugging on this ticket over the past week and I found out some interesting things. To start off with I did manage to predictably replicate the test's flakiness and it does appear to be related to load, i.e. the test is more flaky the less CPU resources it has. I am using docker (i.e. running the tests within docker gradle image) by using the --cpu flag to limit resources. Interestingly I have gone up to 5 cpu's and its still flaking out albeit less often. I attempted to increase the various timeouts that is used in t he test but this had no effect so I am going to dig a bit further. Note that 5 cpu's is already considered "high" (at least for a machine that would reasonably run kafka streams). The machine I am running has 12 "cpus"'s (6 cores, 12 threads) and at least when running with all of the resources on the machine I couldn't replicate the flaky test. was (Author: mdedetrich-aiven): [~cadonna] So I did some debugging on this ticket over the past week and I found out some interesting things. To start off with I did manage to predictably replicate the test's flakiness and it does appear to be related to load, i.e. the test is more flaky the less CPU resources it has. I am using docker (i.e. running the tests within docker gradle image) by using the --cpu flag to limit resources. Interestingly I have gone up to 5 cpu's and its still flaking out albeit less often. I attempted to increase the various timeouts that is used in t he test but this had no effect so I am going to dig a bit further. Note that 5 cpu's is already considered "high" (at least for a machine that would reasonably run kafka streams). The machine I am running as 12 "cpus"'s (6 cores, 12 threads) and at least when running with all of the resources on the machine I couldn't replicate the flaky test. > Flaky test > NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() > > > Key: KAFKA-14014 > URL: https://issues.apache.org/jira/browse/KAFKA-14014 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bruno Cadonna >Priority: Critical > Labels: flaky-test > > {code:java} > java.lang.AssertionError: > Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]> > but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:833) > {code} > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.ka
[jira] [Comment Edited] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563162#comment-17563162 ] Matthew de Detrich edited comment on KAFKA-14014 at 7/6/22 11:23 AM: - So I think I have gotten somewhere. I tried adjusting various timeouts to no avail so I moved onto another theory which is that some topology/stream based methods are asynchronous, i.e they are doing something in the background even after the calling function has finished. To test this theory I have adjusted the flaky test by adding some manual sleeps, i.e. {code:java} final AddNamedTopologyResult result1 = streams.addNamedTopology(topology2Client1); Thread.sleep(500); streams2.addNamedTopology(topology2Client2).all().get(); Thread.sleep(500); result1.all().get(); Thread.sleep(500);{code} This appears to be greatly reducing the flakiness. Originally I used 200 millis which roughly increased the amount of time until failure by 4x however I still got a failure so now I am testing it with the 500 millis as shown above. was (Author: mdedetrich-aiven): So I think I have gotten somewhere. I tried adjusting various timeouts to no avail so I moved onto another theory which is that some topology/stream based methods are asynchronous, i.e they are doing something in the background even after the calling function has finished. To test this theory I have adjusted the flaky test by adding some manual sleeps, i.e. {code:java} final AddNamedTopologyResult result1 = streams.addNamedTopology(topology2Client1); Thread.sleep(500); streams2.addNamedTopology(topology2Client2).all().get(); Thread.sleep(500); result1.all().get(); Thread.sleep(500);{code} This appears to be greatly reducing the flakiness. Originally I used 200 millis which roughly increased the amount of time until failure by 4x and now I am testing it with the 500 millis as shown above. > Flaky test > NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() > > > Key: KAFKA-14014 > URL: https://issues.apache.org/jira/browse/KAFKA-14014 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bruno Cadonna >Priority: Critical > Labels: flaky-test > > {code:java} > java.lang.AssertionError: > Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]> > but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:833) > {code} > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResets
[jira] [Commented] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563162#comment-17563162 ] Matthew de Detrich commented on KAFKA-14014: So I think I have gotten somewhere. I tried adjusting various timeouts to no avail so I moved onto another theory which is that some topology/stream based methods are asynchronous, i.e they are doing something in the background even after the calling function has finished. To test this theory I have adjusted the flaky test by adding some manual sleeps, i.e. {code:java} final AddNamedTopologyResult result1 = streams.addNamedTopology(topology2Client1); Thread.sleep(500); streams2.addNamedTopology(topology2Client2).all().get(); Thread.sleep(500); result1.all().get(); Thread.sleep(500);{code} This appears to be greatly reducing the flakiness. Originally I used 200 millis which roughly increased the amount of time until failure by 4x and now I am testing it with the 500 millis as shown above. > Flaky test > NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() > > > Key: KAFKA-14014 > URL: https://issues.apache.org/jira/browse/KAFKA-14014 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bruno Cadonna >Priority: Critical > Labels: flaky-test > > {code:java} > java.lang.AssertionError: > Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]> > but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:833) > {code} > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] tyamashi-oss commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically
tyamashi-oss commented on code in PR #12296: URL: https://github.com/apache/kafka/pull/12296#discussion_r914719266 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -186,11 +186,18 @@ class LogCleaner(initialConfig: CleanerConfig, } /** -* Reconfigure log clean config. This simply stops current log cleaners and creates new ones. +* Reconfigure log clean config. This updates desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond and stops current log cleaners and creates new ones. Review Comment: Thank you. I’ve update the comment. https://github.com/apache/kafka/pull/12296/commits/f9d27f6181c3fbc1ef02d56904d58793962dedb2 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -1854,6 +1853,26 @@ class LogCleanerTest { } finally logCleaner.shutdown() } + @Test + def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = { +val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") +oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000) + +val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)), + logDirs = Array(TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time) + +assertEquals(logCleaner.throttler.desiredRatePerSec, 1000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp") + +val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") +newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 2000) + +logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps)) Review Comment: Your concern is correct. Thank you. LogCleaner.shutdown() should be called at the end of the test because kafka-log-cleaner-thread-x threads are created in LogCleaner.startup() at the end of LogCleaner.reconfigure(), and the threads continue to remain. I appended LogCleaner.shutdown() to the end of the test and also used LogCleaner with empty startup() and shutdown() implementations. The test is somewhat more white-box like according to the LogCleaner.reconfigure() implementation, but I couldn't think of any other way. Please let me know if you have any. https://github.com/apache/kafka/pull/12296/commits/e05707d5613d096df06c4e96085deb481b00a7e7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tyamashi-oss commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically
tyamashi-oss commented on code in PR #12296: URL: https://github.com/apache/kafka/pull/12296#discussion_r914719051 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -1854,6 +1853,26 @@ class LogCleanerTest { } finally logCleaner.shutdown() } + @Test + def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = { +val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") +oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000) + +val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)), + logDirs = Array(TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time) + +assertEquals(logCleaner.throttler.desiredRatePerSec, 1000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp") Review Comment: Thank you. I’ve updated the assert method parameters and the error message. https://github.com/apache/kafka/pull/12296/commits/e05707d5613d096df06c4e96085deb481b00a7e7 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -1854,6 +1853,26 @@ class LogCleanerTest { } finally logCleaner.shutdown() } + @Test + def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = { +val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") +oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000) + +val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)), + logDirs = Array(TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time) + +assertEquals(logCleaner.throttler.desiredRatePerSec, 1000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp") + +val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") +newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 2000) + +logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps)) + +assertEquals(logCleaner.throttler.desiredRatePerSec, 2000, "Throttler.desiredRatePerSec should be updated with new KafkaConfig.LogCleanerIoMaxBytesPerSecondProp") Review Comment: Thank you. I’ve update the error message. https://github.com/apache/kafka/pull/12296/commits/e05707d5613d096df06c4e96085deb481b00a7e7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12384: KAFKA-10199: Add methods to add and remove tasks to task manager
cadonna commented on code in PR #12384: URL: https://github.com/apache/kafka/pull/12384#discussion_r914710195 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ## @@ -237,14 +237,6 @@ Task activeTasksForInputPartition(final TopicPartition partition) { return activeTasksPerPartition.get(partition); } -// TODO: change return type to `StandbyTask` -Task standbyTask(final TaskId taskId) { -if (!standbyTasksPerId.containsKey(taskId)) { -throw new IllegalStateException("Standby task unknown: " + taskId); -} -return standbyTasksPerId.get(taskId); -} - Review Comment: Not used anywhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12384: KAFKA-10199: Add methods to add and remove tasks to task manager
cadonna commented on code in PR #12384: URL: https://github.com/apache/kafka/pull/12384#discussion_r914709967 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ## @@ -283,24 +280,6 @@ Collection notPausedTasks() { .collect(Collectors.toList()); } -Set activeTaskIds() { -return readOnlyActiveTaskIds; -} - -Set standbyTaskIds() { -return readOnlyStandbyTaskIds; -} - -// TODO: change return type to `StreamTask` -Map activeTaskMap() { -return readOnlyActiveTasksPerId; -} - -// TODO: change return type to `StandbyTask` -Map standbyTaskMap() { -return readOnlyStandbyTasksPerId; -} - Review Comment: Those methods are not used anywhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12384: KAFKA-10199: Add methods to add and remove tasks to task manager
cadonna commented on code in PR #12384: URL: https://github.com/apache/kafka/pull/12384#discussion_r914709611 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -798,12 +798,12 @@ private void closeTaskDirty(final Task task) { } catch (final RuntimeException swallow) { log.error("Error suspending dirty task {} ", task.id(), swallow); } -tasks.removeTaskBeforeClosing(task.id()); +tasks.removeTask(task.id()); Review Comment: Renaming to a more appropriate name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12384: KAFKA-10199: Add methods to add and remove tasks to task manager
cadonna commented on code in PR #12384: URL: https://github.com/apache/kafka/pull/12384#discussion_r914709320 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java: ## @@ -82,7 +82,7 @@ int process(final int maxNumRecords, final Time time) { } } catch (final Throwable t) { taskExecutionMetadata.registerTaskError(task, t, now); - tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed); + tasks.removeTaskFromSuccessfullyProcessedBeforeClosing(lastProcessed); Review Comment: Just a typo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #12379: KAFKA-10199: Remove call to Task#completeRestoration from state updater
cadonna merged PR #12379: URL: https://github.com/apache/kafka/pull/12379 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12379: KAFKA-10199: Remove call to Task#completeRestoration from state updater
cadonna commented on PR #12379: URL: https://github.com/apache/kafka/pull/12379#issuecomment-1176062651 Test failure is unrelated: ``` Build / JDK 11 and Scala 2.13 / testTaskCancellation() – org.apache.kafka.trogdor.coordinator.CoordinatorTest ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request, #12384: KAFKA-10199: Add methods to add and remove tasks to task manager
cadonna opened a new pull request, #12384: URL: https://github.com/apache/kafka/pull/12384 To integrate the state updater into the current code, we need the ability to add and remove tasks from the task manager. This functionality is needed to ensure that a task is managed either by the task manager or by the state updater but not by both. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join
Saumya Gupta created KAFKA-14049: Summary: Relax Non Null Requirement for KStreamGlobalKTable Left Join Key: KAFKA-14049 URL: https://issues.apache.org/jira/browse/KAFKA-14049 Project: Kafka Issue Type: Improvement Components: streams Reporter: Saumya Gupta Null Values in the Stream for a Left Join would indicate a Tombstone Message that needs to propagated if not actually joined with the GlobalKTable message, hence these messages should not be ignored . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] singhnama commented on pull request #12359: KAFKA-13983: Support special character in Resource name in ACLs operation by sanitizing
singhnama commented on PR #12359: URL: https://github.com/apache/kafka/pull/12359#issuecomment-1176054747 Sanitizing the resource name have a compatibility issue with already existing ACLs, and also there are very few special characters which are not allowed by the zookeeper which can be found here [https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html](https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html) so we decided to fail the creation with `/` in the resource name since this is allowed by zookeeper but it's creating the problem. Not allowed characters: `The null character (\u) cannot be part of a path name. (This causes problems with the C binding.) The following characters can't be used because they don't display well, or render in confusing ways: \u0001 - \u0019 and \u007F - \u009F. The following characters are not allowed: \ud800 -uF8FFF, \uFFF0-u, \uXFFFE - \uX (where X is a digit 1 - E), \uF - \uF. The "." character can be used as part of another name, but "." and ".." cannot alone be used to indicate a node along a path, because ZooKeeper doesn't use relative paths. The following would be invalid: "/a/b/./c" or "/a/b/../c". The token "zookeeper" is reserved.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yufeiyan1220 commented on a diff in pull request #12360: KAFKA-14032: Dequeue time for forwarded requests is ignored to set
yufeiyan1220 commented on code in PR #12360: URL: https://github.com/apache/kafka/pull/12360#discussion_r914629492 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -307,8 +307,12 @@ class KafkaApisTest { Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava)) val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, false).build(requestHeader.apiVersion) +val startTimeNanos = time.nanoseconds() +val dequeueCostNanos = 5 * 1000 * 1000 val request = TestUtils.buildEnvelopeRequest( - alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds()) + alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, startTimeNanos) +// add dequeue time to simulate request handlers poll request from requestQueue +request.requestDequeueTimeNanos = startTimeNanos + dequeueCostNanos Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563060#comment-17563060 ] Bruno Cadonna commented on KAFKA-14014: --- [~mdedetrich-aiven] Thank you for the investigation! That is indeed interesting! Maybe you can restructure the test to make it less flaky. [~wcarlson5][~ableegoldman] Do you maybe have some hints to make the test less flaky given Matthew's findings? > Flaky test > NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() > > > Key: KAFKA-14014 > URL: https://issues.apache.org/jira/browse/KAFKA-14014 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bruno Cadonna >Priority: Critical > Labels: flaky-test > > {code:java} > java.lang.AssertionError: > Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]> > but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:833) > {code} > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-14014: -- Labels: flaky-test (was: ) > Flaky test > NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() > > > Key: KAFKA-14014 > URL: https://issues.apache.org/jira/browse/KAFKA-14014 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bruno Cadonna >Priority: Critical > Labels: flaky-test > > {code:java} > java.lang.AssertionError: > Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]> > but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:833) > {code} > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14048) The Next Generation of the Consumer Rebalance Protocol
David Jacot created KAFKA-14048: --- Summary: The Next Generation of the Consumer Rebalance Protocol Key: KAFKA-14048 URL: https://issues.apache.org/jira/browse/KAFKA-14048 Project: Kafka Issue Type: Improvement Reporter: David Jacot Assignee: David Jacot This Jira tracks the development of KIP-848: https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dengziming closed pull request #12157: MINOR: Support co-resident mode in KRaft TestKit
dengziming closed pull request #12157: MINOR: Support co-resident mode in KRaft TestKit URL: https://github.com/apache/kafka/pull/12157 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #12157: MINOR: Support co-resident mode in KRaft TestKit
dengziming commented on PR #12157: URL: https://github.com/apache/kafka/pull/12157#issuecomment-1175912322 Close this currently since it has been done in another PR, and I created KAFKA-14047 for the discussion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14047) Use KafkaRaftManager in KRaft TestKit
dengziming created KAFKA-14047: -- Summary: Use KafkaRaftManager in KRaft TestKit Key: KAFKA-14047 URL: https://issues.apache.org/jira/browse/KAFKA-14047 Project: Kafka Issue Type: Test Reporter: dengziming We are using lower-level {{ControllerServer}} and {{BrokerServer}} in TestKit, we can improve it to use KafkaRaftManager. see the discussion here: https://github.com/apache/kafka/pull/12157#discussion_r882179407 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dengziming commented on a diff in pull request #12265: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating
dengziming commented on code in PR #12265: URL: https://github.com/apache/kafka/pull/12265#discussion_r914535136 ## core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala: ## @@ -240,6 +239,40 @@ class BrokerMetadataListenerTest { } } + @Test + def testNotSnapshotAfterMetadataVersionChangeBeforePublishing(): Unit = { +val snapshotter = new MockMetadataSnapshotter() +val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter), + maxBytesBetweenSnapshots = 1000L) + +updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.latest.featureLevel(), 100L) +listener.getImageRecords().get() +assertEquals(-1L, snapshotter.activeSnapshotOffset, "We won't generate snapshot before starting publishing") + } + + @Test + def testSnapshotAfterMetadataVersionChangeWhenStarting(): Unit = { +val snapshotter = new MockMetadataSnapshotter() +val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter), + maxBytesBetweenSnapshots = 1000L) + +updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.latest.featureLevel(), 100L) +listener.startPublishing(new MockMetadataPublisher()).get() +assertEquals(100L, snapshotter.activeSnapshotOffset, "We should try to generate snapshot when starting publishing") + } + + @Test + def testSnapshotAfterMetadataVersionChange(): Unit = { +val snapshotter = new MockMetadataSnapshotter() +val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter), + maxBytesBetweenSnapshots = 1000L) +listener.startPublishing(new MockMetadataPublisher()).get() + +updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV2.featureLevel(), 100L) Review Comment: You are right, here I tried a different solution to update the feature level to `MetadataVersion.latest().featureLevel() - 1`, then we can be sure it's different from the current feature level, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12265: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating
dengziming commented on code in PR #12265: URL: https://github.com/apache/kafka/pull/12265#discussion_r914533237 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala: ## @@ -45,26 +45,37 @@ class BrokerMetadataSnapshotter( */ private var _currentSnapshotOffset = -1L + /** + * The offset of the newest snapshot, or -1 if there hasn't been one.Accessed only under + * the object lock. + */ + private var _latestSnapshotOffset = -1L + /** * The event queue which runs this listener. */ val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse("")) override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean = synchronized { -if (_currentSnapshotOffset == -1L) { +if (_currentSnapshotOffset != -1) { + warn(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " + +s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}") + false +} else if (_latestSnapshotOffset >= image.highestOffsetAndEpoch().offset) { Review Comment: Yes, the test failed when generating a snapshot twice at the same offset firstly due to enough bytes having accumulated and secondly due to the metadata version changed. I changed it to "==" to make it more accurate and added a unit test for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12380: MINOR: Get rid of agent checks in Jenkinsfile
showuon commented on PR #12380: URL: https://github.com/apache/kafka/pull/12380#issuecomment-1175906691 The `ARM build` is still failing. I've rebuilt it and failed, too. https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12380/2/pipeline/76 I can also take a look when I have time. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563004#comment-17563004 ] Matthew de Detrich edited comment on KAFKA-14014 at 7/6/22 7:29 AM: [~cadonna] So I did some debugging on this ticket over the past week and I found out some interesting things. To start off with I did manage to predictably replicate the test's flakiness and it does appear to be related to load, i.e. the test is more flaky the less CPU resources it has. I am using docker (i.e. running the tests within docker gradle image) by using the --cpu flag to limit resources. Interestingly I have gone up to 5 cpu's and its still flaking out albeit less often. I attempted to increase the various timeouts that is used in t he test but this had no effect so I am going to dig a bit further. Note that 5 cpu's is already considered "high" (at least for a machine that would reasonably run kafka streams). The machine I am running as 12 "cpus"'s (6 cores, 12 threads) and at least when running with all of the resources on the machine I couldn't replicate the flaky test. was (Author: mdedetrich-aiven): [~cadonna] So I did some debugging on this ticket over the past week and I found out some interesting things. To start off with I did manage to predictably replicate the test and its flakiness does appear to be related to load, i.e. the test is more flaky the less CPU resources it has. I am using docker (i.e. running the tests within docker gradle image) by using the --cpu flag to limit resources. Interestingly I have gone up to 5 cpu's and its still flaking out albeit less often. I attempted to increase the various timeouts that is used in t he test but this had no effect so I am going to dig a bit further. Note that 5 cpu's is already considered "high" (at least for a machine that would reasonably run kafka streams). The machine I am running as 12 "cpus"'s (6 cores, 12 threads) and at least when running with all of the resources on the machine I couldn't replicate the flaky test. > Flaky test > NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() > > > Key: KAFKA-14014 > URL: https://issues.apache.org/jira/browse/KAFKA-14014 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bruno Cadonna >Priority: Critical > > {code:java} > java.lang.AssertionError: > Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]> > but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:833) > {code} > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyI
[jira] [Comment Edited] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563004#comment-17563004 ] Matthew de Detrich edited comment on KAFKA-14014 at 7/6/22 7:27 AM: [~cadonna] So I did some debugging on this ticket over the past week and I found out some interesting things. To start off with I did manage to predictably replicate the test and its flakiness does appear to be related to load, i.e. the test is more flaky the less CPU resources it has. I am using docker (i.e. running the tests within docker gradle image) by using the --cpu flag to limit resources. Interestingly I have gone up to 5 cpu's and its still flaking out albeit less often. I attempted to increase the various timeouts that is used in t he test but this had no effect so I am going to dig a bit further. Note that 5 cpu's is already considered "high" (at least for a machine that would reasonably run kafka streams). The machine I am running as 12 "cpus"'s (6 cores, 12 threads) and at least when running with all of the resources on the machine I couldn't replicate the flaky test. was (Author: mdedetrich-aiven): [~cadonna] So I did some debugging on this ticket over the past week and I found out some interesting things. To start off with, the test is more flaky the less CPU resources it has. I am using docker (i.e. running the tests within docker gradle image) by using the --cpu flag to limit resources. Interestingly I have gone up to 5 cpu's and its still flaking out albeit less often. I attempted to increase the various timeouts that is used in t he test but this had no effect so I am going to dig a bit further. Note that 5 cpu's is already considered "high" (at least for a machine that would reasonably run kafka streams). The machine I am running as 12 "cpus"'s (6 cores, 12 threads) and at least when running with all of the resources on the machine I couldn't replicate the flaky test. > Flaky test > NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() > > > Key: KAFKA-14014 > URL: https://issues.apache.org/jira/browse/KAFKA-14014 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bruno Cadonna >Priority: Critical > > {code:java} > java.lang.AssertionError: > Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]> > but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:833) > {code} > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicati
[jira] [Comment Edited] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563004#comment-17563004 ] Matthew de Detrich edited comment on KAFKA-14014 at 7/6/22 7:26 AM: [~cadonna] So I did some debugging on this ticket over the past week and I found out some interesting things. To start off with, the test is more flaky the less CPU resources it has. I am using docker (i.e. running the tests within docker gradle image) by using the --cpu flag to limit resources. Interestingly I have gone up to 5 cpu's and its still flaking out albeit less often. I attempted to increase the various timeouts that is used in t he test but this had no effect so I am going to dig a bit further. Note that 5 cpu's is already considered "high" (at least for a machine that would reasonably run kafka streams). The machine I am running as 12 "cpus"'s (6 cores, 12 threads) and at least when running with all of the resources on the machine I couldn't replicate the flaky test. was (Author: mdedetrich-aiven): [~cadonna] So I did some debugging on this ticket over the past week and I found out some interesting things. To start off with, the test is more flaky the less CPU resources it has. I am using docker (i.e. running the tests within docker gradle image) by using the --cpu flag to limit resources. Interestingly I have gone up to 5 cpu's and its still flaking out albeit less often. I attempted to increase the various timeouts that is used in t he test but this had no effect so I am going to dig a bit further. > Flaky test > NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() > > > Key: KAFKA-14014 > URL: https://issues.apache.org/jira/browse/KAFKA-14014 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bruno Cadonna >Priority: Critical > > {code:java} > java.lang.AssertionError: > Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]> > but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:833) > {code} > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563004#comment-17563004 ] Matthew de Detrich commented on KAFKA-14014: [~cadonna] So I did some debugging on this ticket over the past week and I found out some interesting things. To start off with, the test is more flaky the less CPU resources it has. I am using docker (i.e. running the tests within docker gradle image) by using the --cpu flag to limit resources. Interestingly I have gone up to 5 cpu's and its still flaking out albeit less often. I attempted to increase the various timeouts that is used in t he test but this had no effect so I am going to dig a bit further. > Flaky test > NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() > > > Key: KAFKA-14014 > URL: https://issues.apache.org/jira/browse/KAFKA-14014 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bruno Cadonna >Priority: Critical > > {code:java} > java.lang.AssertionError: > Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]> > but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:833) > {code} > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ -- This message was sent by Atlassian Jira (v8.20.10#820010)