[jira] [Assigned] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.
[ https://issues.apache.org/jira/browse/KAFKA-15127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-15127: - Assignee: Sagar Rao > Allow offsets to be reset at the same time a connector is deleted. > -- > > Key: KAFKA-15127 > URL: https://issues.apache.org/jira/browse/KAFKA-15127 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > This has been listed as [Future > Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors] > in KIP-875. Now that the delete offsets mechanism is also in place, we can > take this up which will allow connector names to be reused after connector > deletion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15106) AbstractStickyAssignor may stuck in 3.5
[ https://issues.apache.org/jira/browse/KAFKA-15106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] li xiangyuan reassigned KAFKA-15106: Assignee: li xiangyuan > AbstractStickyAssignor may stuck in 3.5 > --- > > Key: KAFKA-15106 > URL: https://issues.apache.org/jira/browse/KAFKA-15106 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.0 >Reporter: li xiangyuan >Assignee: li xiangyuan >Priority: Major > > this could reproduce in ut easy, > just int > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest#testLargeAssignmentAndGroupWithNonEqualSubscription, > plz set > partitionCount=200, > consumerCount=20, you can see > isBalanced will return false forever. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.
Sagar Rao created KAFKA-15127: - Summary: Allow offsets to be reset at the same time a connector is deleted. Key: KAFKA-15127 URL: https://issues.apache.org/jira/browse/KAFKA-15127 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Sagar Rao This has been listed as [Future Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors] in KIP-875. Now that the delete offsets mechanism is also in place, we can take this up which will allow connector names to be reused after connector deletion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] flashmouse commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
flashmouse commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1608876813 @rajinisivaram @dajac plz have a look, thx! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] flashmouse opened a new pull request, #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
flashmouse opened a new pull request, #13920: URL: https://github.com/apache/kafka/pull/13920 https://issues.apache.org/jira/browse/KAFKA-15106 in 3.5.0 AbstractStickyAssignor may run useless loop in ``performReassignments `` because ``isBalanced`` have a trivial mistake, and result in rebalance timeout in some situation. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13915: KAFKA-14930: Document the new PATCH and DELETE offsets REST APIs for Connect
yashmayya commented on code in PR #13915: URL: https://github.com/apache/kafka/pull/13915#discussion_r1243145305 ## docs/connect.html: ## @@ -313,7 +313,13 @@ REST API DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration GET /connectors/{name}/topics - get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued PUT /connectors/{name}/topics/reset - send a request to empty the set of active topics of a connector -GET /connectors/{name}/offsets - get the current offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect";>KIP-875 for more details) +Offsets management REST APIs (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect";>KIP-875 for more details): + +GET /connectors/{name}/offsets - get the current offsets for a connector +DELETE /connectors/{name}/offsets - reset the offsets for a connector. The connector must exist and must be in the stopped state. +PATCH /connectors/{name}/offsets - alter the offsets for a connector. The connector must exist and must be in the stopped state. The request body should be a JSON object containing a JSON array offsets field, similar to the response body of the GET /connectors/{name}/offsets REST API. Review Comment: The link to the generated OpenAPI docs are at the end of this section so I don't think it's necessary to add the same link here as well. Also, I think an actual example might be more helpful than the generated OpenAPI spec where the finest level of granularity is the `ConnectorOffset` schema describing the `partition` and `offset` keys having JSON object values. I was hoping that the link to the KIP should be sufficient, but I do see the value of including actual examples directly in the docs as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13915: KAFKA-14930: Document the new PATCH and DELETE offsets REST APIs for Connect
yashmayya commented on code in PR #13915: URL: https://github.com/apache/kafka/pull/13915#discussion_r1243142955 ## docs/connect.html: ## @@ -313,7 +313,13 @@ REST API DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration GET /connectors/{name}/topics - get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued PUT /connectors/{name}/topics/reset - send a request to empty the set of active topics of a connector -GET /connectors/{name}/offsets - get the current offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect";>KIP-875 for more details) +Offsets management REST APIs (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect";>KIP-875 for more details): + +GET /connectors/{name}/offsets - get the current offsets for a connector +DELETE /connectors/{name}/offsets - reset the offsets for a connector. The connector must exist and must be in the stopped state. Review Comment: Hm the `PUT /connectors/{name}/stop` endpoint docs are in the same `connect_rest` section as these docs and I don't think it is possible to link to individual list items inside the section? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13852: KAFKA-15086:Set a reasonable segment size upper limit for MM2 internal topics
hudeqi commented on PR #13852: URL: https://github.com/apache/kafka/pull/13852#issuecomment-1608769851 > @hudeqi sorry, this is a tricky issue and I'm trying to take time to think things through :) > > I hate to say it, but I don't think we can make this change or anything like it without a KIP. This is for two reasons: > > 1. We're effectively changing the default value for the `offset.storage.topic.segment.bytes` property (even if we don't implement this change with that exact logic), which counts as a change to public API for the project > 2. By explicitly setting a value for the offset topic's `segment.bytes` property, we cause any broker-side value for the [log.segment.bytes property](https://kafka.apache.org/documentation.html#brokerconfigs_log.segment.bytes) to be ignored. If the broker uses a lower value for this property than our default, then we may make things worse instead of better > > I still think it's likely that decreasing the segment size for the offsets topic would help, but it'd be nice if we could get the kind of review that a KIP requires before making that kind of change. > > As far as increasing the number of consumer threads goes, I think it's really a question of what the performance bottleneck is when reading to the end of the topic. If CPU is the issue, then sure, it'd probably help to scale up the number of consumers. However, if network transfer between the worker and the Kafka cluster is the limiting factor, then it won't have any impact. The nice thing about decreasing the segment size is that (as long as it leads to a reduction in the total size of the offsets topic), it would help in either case: you'd have less data to consume from Kafka, and also less data to process on your Connect worker. > > This almost certainly varies depending on the environment Kafka Connect and Kafka are run in, but my hunch is that your fix here would be more effective than scaling up the number of consumers. I'd be curious to see if we could get benchmark numbers on that front, though. Thanks, I will issue a KIP and reopen this PR later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13965) Document broker-side socket-server-metrics
[ https://issues.apache.org/jira/browse/KAFKA-13965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737431#comment-17737431 ] David Jameson commented on KAFKA-13965: --- This looks like a good first ticket to get familiar with the codebase. I am keen to pick it up. > Document broker-side socket-server-metrics > -- > > Key: KAFKA-13965 > URL: https://issues.apache.org/jira/browse/KAFKA-13965 > Project: Kafka > Issue Type: Improvement > Components: documentation >Affects Versions: 3.2.0 >Reporter: James Cheng >Priority: Major > Labels: newbie, newbie++ > > There are a bunch of broker JMX metrics in the "socket-server-metrics" space > that are not documented on kafka.apache.org/documentation > > * {_}MBean{_}: > kafka.server:{{{}type=socket-server-metrics,listener=,networkProcessor={}}} > ** From KIP-188: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-188+-+Add+new+metrics+to+support+health+checks] > * > kafka.server:type=socket-server-metrics,name=connection-accept-rate,listener=\{listenerName} > ** From KIP-612: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers] > It would be helpful to get all the socket-server-metrics documented > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] github-actions[bot] commented on pull request #13375: KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode
github-actions[bot] commented on PR #13375: URL: https://github.com/apache/kafka/pull/13375#issuecomment-1608729834 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] iblislin opened a new pull request, #13919: MINOR: doc/streams/dsl-api, fix href of "KTable-KTable Foreign-Key Joins"
iblislin opened a new pull request, #13919: URL: https://github.com/apache/kafka/pull/13919 The `href` shown in ToC is here: https://github.com/apache/kafka/blob/c5889fceddb9a0174452ae60a57c8ff3f087a6a4/docs/streams/developer-guide/dsl-api.html#L52 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jeffkbkim commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1243002218 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.server.ReplicaManager +import kafka.utils.Logging +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.NotLeaderOrFollowerException +import org.apache.kafka.common.record.{FileRecords, MemoryRecords} +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, UnknownRecordTypeException} +import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback} +import org.apache.kafka.server.util.KafkaScheduler +import org.apache.kafka.storage.internals.log.FetchIsolation + +import java.nio.ByteBuffer +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean +import scala.jdk.CollectionConverters._ + +/** + * Coordinator loader which reads records from a partition and replays them + * to a group coordinator. + * + * @param replicaManager The replica manager. + * @param deserializerThe deserializer to use. + * @param loadBufferSize The load buffer size. + * @tparam T The record type. + */ +class CoordinatorLoaderImpl[T]( + replicaManager: ReplicaManager, + deserializer: Deserializer[T], + loadBufferSize: Int +) extends CoordinatorLoader[T] with Logging { + private val isRunning = new AtomicBoolean(true) + private val scheduler = new KafkaScheduler(1) + scheduler.startup() + + /** + * Loads the coordinator by reading all the records from the TopicPartition + * and applying them to the Replayable object. + * + * @param tp The TopicPartition to read from. + * @param coordinator The object to apply records to. + */ + override def load( +tp: TopicPartition, +coordinator: CoordinatorPlayback[T] +): CompletableFuture[Void] = { +val future = new CompletableFuture[Void]() +val result = scheduler.scheduleOnce(s"Load coordinator from $tp", + () => doLoad(tp, coordinator, future)) +if (result.isCancelled) { + future.completeExceptionally(new RuntimeException("Coordinator loader is closed.")) +} +future + } + + private def doLoad( +tp: TopicPartition, +coordinator: CoordinatorPlayback[T], +future: CompletableFuture[Void] + ): Unit = { +try { + replicaManager.getLog(tp) match { +case None => + future.completeExceptionally(new NotLeaderOrFollowerException( +s"Could not load records from $tp because the log does not exist.")) + +case Some(log) => + def logEndOffset: Long = replicaManager.getLogEndOffset(tp).getOrElse(-1L) + + // buffer may not be needed if records are read from memory + var buffer = ByteBuffer.allocate(0) + // loop breaks if leader changes at any time during the load, since logEndOffset is -1 + var currOffset = log.logStartOffset + // loop breaks if no records have been read, since the end of the log has been reached + var readAtLeastOneRecord = true + + while (currOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) { +val fetchDataInfo = log.read( + startOffset = currOffset, + maxLength = loadBufferSize, + isolation = FetchIsolation.LOG_END, + minOneMessage = true +) + +readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 + +val memoryRecords = (fetchDataInfo.records: @unchecked) match { + case records: MemoryRecords => +records + + case fileRecords: FileRecords => +val sizeInBytes = fileRecords.sizeInBytes +val bytesNeeded = Math.max(loadBufferSize, sizeInBytes) + +// minOneMessage = true in the above log.read means that the buffer may need to Review Comment: nit: was confused on log.read. maybe use `log.read()`? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Record
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner
gharris1727 commented on code in PR #13821: URL: https://github.com/apache/kafka/pull/13821#discussion_r1242974877 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -104,6 +81,33 @@ public DelegatingClassLoader(List pluginLocations) { this(pluginLocations, DelegatingClassLoader.class.getClassLoader()); } +public Set sources() { Review Comment: The refactor was just a little bit more involved (as I had to figure out a new mocking strategy for newPluginClassLoader) but I think it's better now. I like the static implementation, and I really like how empty the DCL class is now. I'm not sure exactly where to put the function though, and just stuffed it into PluginUtils for now. Do you think this is more appropriate somewhere else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner
gharris1727 commented on code in PR #13821: URL: https://github.com/apache/kafka/pull/13821#discussion_r1242962203 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.connect.components.Versioned; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.sql.Driver; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.ServiceConfigurationError; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +public abstract class PluginScanner { + +private static final Logger log = LoggerFactory.getLogger(PluginScanner.class); + +public PluginScanResult discoverPlugins(Set sources) { +long startMs = System.currentTimeMillis(); +List results = new ArrayList<>(); +for (PluginSource source : sources) { +results.add(scanUrlsAndAddPlugins(source)); +} +long endMs = System.currentTimeMillis(); +log.info("Scanning plugins with {} took {} ms", getClass().getSimpleName(), endMs - startMs); +return new PluginScanResult(results); +} + +private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) { +PluginScanResult plugins = scanPlugins(source); +loadJdbcDrivers(source.loader()); +return plugins; +} + +protected abstract PluginScanResult scanPlugins(PluginSource source); + +private void loadJdbcDrivers(final ClassLoader loader) { +// Apply here what java.sql.DriverManager does to discover and register classes +// implementing the java.sql.Driver interface. +AccessController.doPrivileged( +(PrivilegedAction) () -> { +ServiceLoader loadedDrivers = ServiceLoader.load( +Driver.class, +loader +); +Iterator driversIterator = loadedDrivers.iterator(); +try { +while (driversIterator.hasNext()) { +Driver driver = driversIterator.next(); +log.debug( +"Registered java.sql.Driver: {} to java.sql.DriverManager", +driver +); +} +} catch (Throwable t) { +log.debug( +"Ignoring java.sql.Driver classes listed in resources but not" ++ " present in class loader's classpath: ", +t +); +} +return null; +} +); +} + +@SuppressWarnings({"rawtypes", "unchecked"}) +protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { +return new PluginDesc(plugin, version, loader); +} + +@SuppressWarnings("unchecked") +protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +SortedSet> result = new TreeSet<>(); +ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); +for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { +try (LoaderSwap loaderSwap = withClassLoader(loader)) { +T pluginImpl; +try { +pluginImpl = iterator.next(); +} catch (ServiceConfigurationError t) { +log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); +continue; +} +Class pluginKlass = (Class) pluginImpl.getClass(); +if (pluginKlass.getClassLoader() != loader) { +log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading"
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner
gharris1727 commented on code in PR #13821: URL: https://github.com/apache/kafka/pull/13821#discussion_r1242961259 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -147,229 +151,7 @@ PluginClassLoader newPluginClassLoader( ); } -public PluginScanResult initLoaders() { -List results = new ArrayList<>(); -for (Path pluginLocation : pluginLocations) { -try { -results.add(registerPlugin(pluginLocation)); -} catch (InvalidPathException | MalformedURLException e) { -log.error("Invalid path in plugin path: {}. Ignoring.", pluginLocation, e); -} catch (IOException e) { -log.error("Could not get listing for plugin path: {}. Ignoring.", pluginLocation, e); -} -} -// Finally add parent/system loader. -results.add(scanUrlsAndAddPlugins( -getParent(), -ClasspathHelper.forJavaClassPath().toArray(new URL[0]) -)); -PluginScanResult scanResult = new PluginScanResult(results); -installDiscoveredPlugins(scanResult); -return scanResult; -} - -private PluginScanResult registerPlugin(Path pluginLocation) -throws IOException { -log.info("Loading plugin from: {}", pluginLocation); -List pluginUrls = new ArrayList<>(); -for (Path path : PluginUtils.pluginUrls(pluginLocation)) { -pluginUrls.add(path.toUri().toURL()); -} -URL[] urls = pluginUrls.toArray(new URL[0]); -if (log.isDebugEnabled()) { -log.debug("Loading plugin urls: {}", Arrays.toString(urls)); -} -PluginClassLoader loader = newPluginClassLoader( -pluginLocation.toUri().toURL(), -urls, -this -); -return scanUrlsAndAddPlugins(loader, urls); -} - -private PluginScanResult scanUrlsAndAddPlugins( -ClassLoader loader, -URL[] urls -) { -PluginScanResult plugins = scanPluginPath(loader, urls); -log.info("Registered loader: {}", loader); Review Comment: This and other similar log messages are now in PluginScanner, and print out during the same phase of plugin scanning as they did before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request, #13918: DO NOT MERGE -- testing
jolshan opened a new pull request, #13918: URL: https://github.com/apache/kafka/pull/13918 Trying to debug flaky test that only seems to fail on CI. This is a version of https://github.com/apache/kafka/pull/13787 where I comment out some of the noisy logs and include some less noisy ones. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1242874939 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + val verificationFailureRate = metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS) Review Comment: What is the version if we don't send the request. I think this makes sense on api specific metrics but maybe not the ones for the manager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1242731206 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() { return new AddPartitionsToTxnRequest(new AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), version()); } +public boolean verifyOnlyRequest() { Review Comment: I remember why I made this choice. We want all the requests to be verify only because the idea is that verify only requests require in memory checks only. Adding the partition otherwise requires a write to the log. In the case that any partition needs to be added and not just verified, we have that log write which will always take the majority of the time. In this case, it makes sense to group with the normal add partitions requests. However, in the case where all transactions are verify only, we will see faster handling and therefore want to separate the requests to different metrics as to not bring down the average for metrics like request timing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1242867082 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() { return new AddPartitionsToTxnRequest(new AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), version()); } +public boolean verifyOnlyRequest() { +return version() > 3 && Review Comment: Unfortunately that introduces a dependency on main in clients. I can add such a constant to this request file if that helps. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1242860978 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.server.ReplicaManager +import kafka.utils.Logging +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.NotLeaderOrFollowerException +import org.apache.kafka.common.record.{FileRecords, MemoryRecords} +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, UnknownRecordTypeException} +import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback} +import org.apache.kafka.server.util.KafkaScheduler +import org.apache.kafka.storage.internals.log.FetchIsolation + +import java.nio.ByteBuffer +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean +import scala.jdk.CollectionConverters._ + +/** + * Coordinator loader which reads records from a partition and replays them + * to a group coordinator. + * + * @param replicaManager The replica manager. + * @param deserializerThe deserializer to use. + * @param loadBufferSize The load buffer size. + * @tparam T The record type. + */ +class CoordinatorLoaderImpl[T]( Review Comment: Where are we creating this loader btw? LIke BrokerServer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1242855495 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RecordSerdeTest { +@Test +public void testSerializeKey() { +RecordSerde serializer = new RecordSerde(); +Record record = new Record( +new ApiMessageAndVersion( +new ConsumerGroupMetadataKey().setGroupId("group"), +(short) 1 +), +new ApiMessageAndVersion( +new ConsumerGroupMetadataValue().setEpoch(10), +(short) 0 +) +); + +assertArrayEquals( +MessageUtil.toVersionPrefixedBytes(record.key().version(), record.key().message()), +serializer.serializeKey(record) +); +} + +@Test +public void testSerializeValue() { +RecordSerde serializer = new RecordSerde(); +Record record = new Record( +new ApiMessageAndVersion( +new ConsumerGroupMetadataKey().setGroupId("group"), +(short) 1 +), +new ApiMessageAndVersion( +new ConsumerGroupMetadataValue().setEpoch(10), +(short) 0 +) +); + +assertArrayEquals( +MessageUtil.toVersionPrefixedBytes(record.value().version(), record.value().message()), +serializer.serializeValue(record) +); +} + +@Test +public void testSerializeNullValue() { +RecordSerde serializer = new RecordSerde(); +Record record = new Record( +new ApiMessageAndVersion( +new ConsumerGroupMetadataKey().setGroupId("group"), +(short) 1 +), +null +); + +assertNull(serializer.serializeValue(record)); +} + +@Test +public void testDeserialize() { +RecordSerde serDe = new RecordSerde(); + +ApiMessageAndVersion key = new ApiMessageAndVersion( +new Consu
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1242849519 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java: ## @@ -26,7 +27,40 @@ * * @param The type of the record. */ -public interface CoordinatorLoader { +public interface CoordinatorLoader extends AutoCloseable { + +/** + * UnknownRecordTypeException is thrown when the Deserializer encounters + * an unknown record type. + */ +class UnknownRecordTypeException extends RuntimeException { +private final short unknownType; + +public UnknownRecordTypeException(short unknownType) { +super(String.format("Found an unknown record type %d", unknownType)); +this.unknownType = unknownType; +} + +public short unknownType() { +return unknownType; Review Comment: Ah I see in the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1242822664 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java: ## @@ -26,7 +27,40 @@ * * @param The type of the record. */ -public interface CoordinatorLoader { +public interface CoordinatorLoader extends AutoCloseable { + +/** + * UnknownRecordTypeException is thrown when the Deserializer encounters + * an unknown record type. + */ +class UnknownRecordTypeException extends RuntimeException { +private final short unknownType; + +public UnknownRecordTypeException(short unknownType) { +super(String.format("Found an unknown record type %d", unknownType)); +this.unknownType = unknownType; +} + +public short unknownType() { +return unknownType; Review Comment: Do we use this anywhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1242819355 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java: ## @@ -26,7 +27,40 @@ * * @param The type of the record. */ -public interface CoordinatorLoader { +public interface CoordinatorLoader extends AutoCloseable { + +/** + * UnknownRecordTypeException is thrown when the Deserializer encounters + * an unknown record type. + */ +class UnknownRecordTypeException extends RuntimeException { Review Comment: Did we previously not have such an exception? I'm suprised 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1242803548 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer for {{@link Record}}. + */ +public class RecordSerde implements PartitionWriter.Serializer, CoordinatorLoader.Deserializer { +@Override +public byte[] serializeKey(Record record) { +// Record does not accept a null key. +return MessageUtil.toVersionPrefixedBytes( +record.key().version(), +record.key().message() +); +} + +@Override +public byte[] serializeValue(Record record) { +// Tombstone is represented with a null value. +if (record.value() == null) { +return null; +} else { +return MessageUtil.toVersionPrefixedBytes( +record.value().version(), +record.value().message() +); +} +} + +@Override +public Record deserialize( +ByteBuffer keyBuffer, +ByteBuffer valueBuffer +) throws RuntimeException { +final short recordType = readVersion(keyBuffer, "key"); +final ApiMessage keyMessage = apiMessageKeyFor(recordType); +readMessage(keyMessage, keyBuffer, recordType, "key"); + +if (valueBuffer == null) { +return new Record(new ApiMessageAndVersion(keyMessage, recordType), null); +} + +final ApiMessage valueMessage = apiMessageValueFor(recordType); +final short valueVersion = readVersion(valueBuffer, "value"); Review Comment: This is also just to distinguish the various records right? Any reason why the key is recordType, but the value is valueVersion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1242789616 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer for {{@link Record}}. + */ +public class RecordSerde implements PartitionWriter.Serializer, CoordinatorLoader.Deserializer { Review Comment: Did we want to include a reference to the group coordinator in the class name? Or is it enough to be in this classpath? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1242786634 ## core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala: ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.log.UnifiedLog +import kafka.server.ReplicaManager +import kafka.utils.TestUtils +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.NotLeaderOrFollowerException +import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecords, SimpleRecord} +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.UnknownRecordTypeException +import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback} +import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, LogOffsetMetadata} +import org.apache.kafka.test.TestUtils.assertFutureThrows +import org.junit.jupiter.api.Assertions.{assertEquals, assertNull} +import org.junit.jupiter.api.{Test, Timeout} +import org.mockito.{ArgumentCaptor, ArgumentMatchers} +import org.mockito.Mockito.{mock, verify, when} + +import java.nio.ByteBuffer +import java.nio.charset.Charset +import java.util.concurrent.{CountDownLatch, TimeUnit} + +class StringKeyValueDeserializer extends CoordinatorLoader.Deserializer[(String, String)] { + override def deserialize(key: ByteBuffer, value: ByteBuffer): (String, String) = { +( + Charset.defaultCharset().decode(key).toString, + Charset.defaultCharset().decode(value).toString +) + } +} + +@Timeout(60) +class CoordinatorLoaderImplTest { + @Test + def testNonexistentPartition(): Unit = { +val tp = new TopicPartition("foo", 0) +val replicaManager = mock(classOf[ReplicaManager]) +val serde = mock(classOf[CoordinatorLoader.Deserializer[(String, String)]]) +val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + +TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + replicaManager = replicaManager, + deserializer = serde, + loadBufferSize = 1000 +)) { loader => + when(replicaManager.getLog(tp)).thenReturn(None) + + val result = loader.load(tp, coordinator) + assertFutureThrows(result, classOf[NotLeaderOrFollowerException]) +} + } + + @Test + def testLoadingIsRejectedWhenClosed(): Unit = { +val tp = new TopicPartition("foo", 0) +val replicaManager = mock(classOf[ReplicaManager]) +val serde = mock(classOf[CoordinatorLoader.Deserializer[(String, String)]]) +val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + +TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + replicaManager = replicaManager, + deserializer = serde, + loadBufferSize = 1000 +)) { loader => + loader.close() + + val result = loader.load(tp, coordinator) + assertFutureThrows(result, classOf[RuntimeException]) +} + } + + @Test + def testLoading(): Unit = { +val tp = new TopicPartition("foo", 0) +val replicaManager = mock(classOf[ReplicaManager]) +val serde = new StringKeyValueDeserializer +val log = mock(classOf[UnifiedLog]) +val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + +TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + replicaManager = replicaManager, + deserializer = serde, + loadBufferSize = 1000 +)) { loader => + when(replicaManager.getLog(tp)).thenReturn(Some(log)) + when(log.logStartOffset).thenReturn(0L) + when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L)) + + val readResult1 = logReadResult(startOffset = 0, records = Seq( +new SimpleRecord("k1".getBytes, "v1".getBytes), +new SimpleRecord("k2".getBytes, "v2".getBytes) + )) + + when(log.read( +startOffset = 0L, +maxLength = 1000, +isolation = FetchIsolation.LOG_END, +minOneMessage = true + )).thenReturn(readResult1) + + val readResult2 = logReadResult(startOffset = 2, records = Seq( +new SimpleRecord("k3".getBytes, "v3".getBytes), +new SimpleRecor
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1242731206 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() { return new AddPartitionsToTxnRequest(new AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), version()); } +public boolean verifyOnlyRequest() { Review Comment: I remember why I made this choice. We want all the requests to be verify only because the idea is that verify only requests require in memory checks only. Adding the partition otherwise requires a write to the log. In the case that any partition needs to be added and not just verified, we have that log write which will always take the majority of the time. In this case, it makes sense to group with the normal add partitions requests. However, in the case where all partitions are verify only, we will see faster handling and therefore want to separate the requests to different metrics as to not bring down the average for metrics like request timing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1242724200 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + val verificationFailureRate = metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS) + val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs") Review Comment: Sorry if this is a bit confusing. But the plan was not to have the metrics on the request type and rather on the manager object. This is why we report the metrics even when we don't send the verification (AddPartitionsToTxn) request. We can use the AddPartitionsToTxn Verify requests to get the timing and error rates from the api. However, these metrics are intended to capture the full end to end handling of the verification. This means the timing will also include any queuing/waiting for sending to the coordinator and we can get errors rates when we don't send the request (ie, repeat produce request with the same producer id). They are meant to supplement the api metrics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1242721231 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() { return new AddPartitionsToTxnRequest(new AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), version()); } +public boolean verifyOnlyRequest() { Review Comment: Let me think on this a bit -- maybe we don't need all the fields to be verify only. If I change anything, I will also add the comment explaining. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1242719150 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -2158,6 +2158,12 @@ object TestUtils extends Logging { KafkaYammerMetrics.defaultRegistry.removeMetric(metricName) } + def clearYammerMetric(metricName: String): Unit = { Review Comment: I meant to remove this. Thanks for reminding me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13915: KAFKA-14930: Document the new PATCH and DELETE offsets REST APIs for Connect
C0urante commented on code in PR #13915: URL: https://github.com/apache/kafka/pull/13915#discussion_r1242245895 ## docs/connect.html: ## @@ -301,7 +301,7 @@ REST API GET /connectors/{name}/tasks - get a list of tasks currently running for a connector GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed. Any resources claimed by its tasks are left allocated, which allows the connector to begin processing data quickly once it is resumed. -PUT /connectors/{name}/stop - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. +PUT /connectors/{name}/stop - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. Note that the offsets for a connector can be modified via the offsets management REST APIs only if it is in the stopped state. Review Comment: ```suggestion PUT /connectors/{name}/stop - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. Note that the offsets for a connector can be only modified via the offsets management endpoints if it is in the stopped state ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13884: MINOR: fix typos for client
divijvaidya commented on code in PR #13884: URL: https://github.com/apache/kafka/pull/13884#discussion_r1242669461 ## clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java: ## @@ -854,7 +854,7 @@ public void shouldThrowOnInvalidDateFormatOrNullTimestamp() { private void checkExceptionForGetDateTimeMethod(Executable executable) { assertTrue(assertThrows(ParseException.class, executable) -.getMessage().contains("Unparseable date")); +.getMessage().contains("Unparsable date")); Review Comment: we have some test failures associated with this: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13884/3/testReport/junit/org.apache.kafka.common.utils/UtilsTest/Build___JDK_11_and_Scala_2_13___shouldThrowOnInvalidDateFormatOrNullTimestamp__/ Please fix the source code along with the test validation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13831: KAFKA-15053: Use case insensitive validator for security.protocol config
divijvaidya commented on PR #13831: URL: https://github.com/apache/kafka/pull/13831#issuecomment-1608088036 @C0urante ah! we have fancy stuff. Thanks for letting me know. @bogao007 seems like we might not need the docs change for config after all. The only remaining fix is the small nit. We should be ready to merge (assuming sane CI tests) after that (unless @C0urante has some additional comments?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-8977) Remove MockStreamsMetrics Since it is not a Mock
[ https://issues.apache.org/jira/browse/KAFKA-8977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joobi S B reassigned KAFKA-8977: Assignee: Joobi S B > Remove MockStreamsMetrics Since it is not a Mock > > > Key: KAFKA-8977 > URL: https://issues.apache.org/jira/browse/KAFKA-8977 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Joobi S B >Priority: Minor > Labels: newbie > > The class {{MockStreamsMetrics}} is used throughout unit tests as a mock but > it is not really a mock since it only hides two parameters of the > {{StreamsMetricsImpl}} constructor. Either a real mock or the real > {{StreamsMetricsImpl}} should be used in the tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15050) Prompts in the quickstarts
[ https://issues.apache.org/jira/browse/KAFKA-15050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737315#comment-17737315 ] Joobi S B commented on KAFKA-15050: --- Hi [~tombentley], i've updated the PR with latest comments, could you please have a look [https://github.com/apache/kafka/pull/13862 |https://github.com/apache/kafka/pull/13862] > Prompts in the quickstarts > -- > > Key: KAFKA-15050 > URL: https://issues.apache.org/jira/browse/KAFKA-15050 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Tom Bentley >Assignee: Joobi S B >Priority: Trivial > Labels: newbie > > In the quickstarts [Steps > 1-5|https://kafka.apache.org/documentation/#quickstart] use {{$}} to indicate > the command prompt. When we start to use Kafka Connect in [Step > 6|https://kafka.apache.org/documentation/#quickstart_kafkaconnect] we switch > to {{{}>{}}}. The [Kafka Streams > quickstart|https://kafka.apache.org/documentation/streams/quickstart] also > uses {{{}>{}}}. I don't think there's a reason for this, but if there is one > (root vs user account?) it should be explained. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
divijvaidya commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1242585330 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() { return new AddPartitionsToTxnRequest(new AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), version()); } +public boolean verifyOnlyRequest() { +return version() > 3 && +data.transactions().stream().filter(transaction -> transaction.verifyOnly()).toArray().length == data.transactions().size(); Review Comment: could be replaced with `data.transactions().stream().allMatch(AddPartitionsToTxnTransaction::verifyOnly)` and this would let the JDK library perform this check optimally. ## core/src/main/scala/kafka/network/RequestChannel.scala: ## @@ -240,16 +240,17 @@ object RequestChannel extends Logging { val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos) val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos) val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos) - val fetchMetricNames = + val metricNames = Review Comment: s/metricNames/overrideMetricNames? ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + val verificationFailureRate = metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS) Review Comment: we are using `VerificationFailureRate` at couple of places such as shutdown and in tests. Please move to a constant in companion object. ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + val verificationFailureRate = metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS) Review Comment: you might be interested in adding "version" to the tags ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -2158,6 +2158,12 @@ object TestUtils extends Logging { KafkaYammerMetrics.defaultRegistry.removeMetric(metricName) } + def clearYammerMetric(metricName: String): Unit = { Review Comment: where are we using this? ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() { return new AddPartitionsToTxnRequest(new AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), version()); } +public boolean verifyOnlyRequest() { +return version() > 3 && Review Comment: (optional) May I suggest adding a constant, `EARLIEST_SUPPORTED_VERSION` to `AddPartitionsToTxnManager` and using the constant over here. It greatly helps in code readability. ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + val verificationFailureRate = metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS) + val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs") Review Comment: We probably want this (and failure) metric consistent with metrics that we have for other requests. To make it consistent, we will have to do things like, 1\ use a biased histogram 2\ add tags such as request=AddPartitionsToTxn. Please reference RequestChannel#RequestMetrics on how other APIs are capturing metrics. ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() { return new AddPartitionsToTxnRequest(new AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), version()); } +public boolean verifyOnlyRequest() { Review Comment: It would be nice if we could add a java doc saying that we expect all requests from clients which are version > 3 to contain verifyOnly
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1242623544 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.server.ReplicaManager +import kafka.utils.Logging +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.NotLeaderOrFollowerException +import org.apache.kafka.common.record.{FileRecords, MemoryRecords} +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, UnknownRecordTypeException} +import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback} +import org.apache.kafka.server.util.KafkaScheduler +import org.apache.kafka.storage.internals.log.FetchIsolation + +import java.nio.ByteBuffer +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean +import scala.jdk.CollectionConverters._ + +/** + * Coordinator loader which reads records from a partition and replays them + * to a group coordinator. + * + * @param replicaManager The replica manager. + * @param deserializerThe deserializer to use. + * @param loadBufferSize The load buffer size. + * @tparam T The record type. + */ +class CoordinatorLoaderImpl[T]( + replicaManager: ReplicaManager, + deserializer: Deserializer[T], + loadBufferSize: Int +) extends CoordinatorLoader[T] with Logging { + private val isRunning = new AtomicBoolean(true) + private val scheduler = new KafkaScheduler(1) + scheduler.startup() + + /** + * Loads the coordinator by reading all the records from the TopicPartition + * and applying them to the Replayable object. + * + * @param tp The TopicPartition to read from. + * @param coordinator The object to apply records to. + */ + override def load( +tp: TopicPartition, +coordinator: CoordinatorPlayback[T] +): CompletableFuture[Void] = { +val future = new CompletableFuture[Void]() +val result = scheduler.scheduleOnce(s"Load coordinator from $tp", + () => doLoad(tp, coordinator, future)) +if (result.isCancelled) { + future.completeExceptionally(new RuntimeException("Coordinator loader is closed.")) +} +future + } + + private def doLoad( +tp: TopicPartition, +coordinator: CoordinatorPlayback[T], +future: CompletableFuture[Void] + ): Unit = { +try { + replicaManager.getLog(tp) match { +case None => + future.completeExceptionally(new NotLeaderOrFollowerException( +s"Could not load records from $tp because the log does not exist.")) + +case Some(log) => + def logEndOffset: Long = replicaManager.getLogEndOffset(tp).getOrElse(-1L) + + // buffer may not be needed if records are read from memory + var buffer = ByteBuffer.allocate(0) + // loop breaks if leader changes at any time during the load, since logEndOffset is -1 + var currOffset = log.logStartOffset + // loop breaks if no records have been read, since the end of the log has been reached + var readAtLeastOneRecord = true + + while (currOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) { +val fetchDataInfo = log.read( + startOffset = currOffset, + maxLength = loadBufferSize, + isolation = FetchIsolation.LOG_END, + minOneMessage = true +) + +readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 + +val memoryRecords = (fetchDataInfo.records: @unchecked) match { + case records: MemoryRecords => +records + + case fileRecords: FileRecords => +val sizeInBytes = fileRecords.sizeInBytes +val bytesNeeded = Math.max(loadBufferSize, sizeInBytes) + +// minOneMessage = true in the above log.read means that the buffer may need to +// be grown to ensure progress can be made. +if (buffer.capacity < bytesNeeded) { + if (loadBufferSize < bytesNeed
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1242616962 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.server.ReplicaManager +import kafka.utils.Logging +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.NotLeaderOrFollowerException +import org.apache.kafka.common.record.{FileRecords, MemoryRecords} +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, UnknownRecordTypeException} +import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback} +import org.apache.kafka.server.util.KafkaScheduler +import org.apache.kafka.storage.internals.log.FetchIsolation + +import java.nio.ByteBuffer +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean +import scala.jdk.CollectionConverters._ + +/** + * Coordinator loader which reads records from a partition and replays them + * to a group coordinator. + * + * @param replicaManager The replica manager. + * @param deserializerThe deserializer to use. + * @param loadBufferSize The load buffer size. + * @tparam T The record type. + */ +class CoordinatorLoaderImpl[T]( + replicaManager: ReplicaManager, + deserializer: Deserializer[T], + loadBufferSize: Int +) extends CoordinatorLoader[T] with Logging { + private val isRunning = new AtomicBoolean(true) + private val scheduler = new KafkaScheduler(1) + scheduler.startup() + + /** + * Loads the coordinator by reading all the records from the TopicPartition + * and applying them to the Replayable object. + * + * @param tp The TopicPartition to read from. + * @param coordinator The object to apply records to. + */ + override def load( +tp: TopicPartition, +coordinator: CoordinatorPlayback[T] +): CompletableFuture[Void] = { +val future = new CompletableFuture[Void]() +val result = scheduler.scheduleOnce(s"Load coordinator from $tp", + () => doLoad(tp, coordinator, future)) +if (result.isCancelled) { Review Comment: This works because we schedule a no-op task when the scheduler is not running? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest
C0urante commented on code in PR #13383: URL: https://github.com/apache/kafka/pull/13383#discussion_r1242606914 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ## @@ -706,95 +662,72 @@ public void testSourceTaskIgnoresProducerException() throws Exception { // and no ConnectException will be thrown SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, offset2, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); -expectOffsetFlush(true); -expectSendRecordOnce(); -expectSendRecordProducerCallbackFail(); -sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.isNull()); -//As of KAFKA-14079 all offsets should be committed, even for failed records (if ignored) -//Only the last offset will be passed to the method as everything up to that point is committed -//Before KAFKA-14079 offset 12 would have been passed and not 13 as it would have been unacked -offsetWriter.offset(PARTITION, offset2); -PowerMock.expectLastCall(); +expectOffsetFlush(); +expectPreliminaryCalls(); -PowerMock.replayAll(); +when(producer.send(any(ProducerRecord.class), any(Callback.class))) +.thenAnswer(producerSendAnswer(true)) +.thenAnswer(producerSendAnswer(false)); //Send records and then commit offsets and verify both were committed and no exception -Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); -Whitebox.invokeMethod(workerTask, "sendRecords"); -Whitebox.invokeMethod(workerTask, "updateCommittableOffsets"); +workerTask.toSend = Arrays.asList(record1, record2); +workerTask.sendRecords(); +workerTask.updateCommittableOffsets(); workerTask.commitOffsets(); -PowerMock.verifyAll(); +//As of KAFKA-14079 all offsets should be committed, even for failed records (if ignored) +//Only the last offset will be passed to the method as everything up to that point is committed +//Before KAFKA-14079 offset 12 would have been passed and not 13 as it would have been unacked +verify(offsetWriter).offset(PARTITION, offset2); +verify(sourceTask).commitRecord(any(SourceRecord.class), isNull()); //Double check to make sure all submitted records were cleared -assertEquals(0, ((SubmittedRecords) Whitebox.getInternalState(workerTask, -"submittedRecords")).records.size()); +assertEquals(0, workerTask.submittedRecords.records.size()); } @Test public void testSlowTaskStart() throws Exception { final CountDownLatch startupLatch = new CountDownLatch(1); final CountDownLatch finishStartupLatch = new CountDownLatch(1); - createWorkerTask(); -offsetStore.start(); -EasyMock.expectLastCall(); -sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); -EasyMock.expectLastCall(); -sourceTask.start(TASK_PROPS); -EasyMock.expectLastCall().andAnswer(() -> { +doAnswer((Answer) invocation -> { startupLatch.countDown(); -assertTrue(awaitLatch(finishStartupLatch)); +ConcurrencyUtils.awaitLatch(finishStartupLatch, "Timeout waiting for task to stop"); Review Comment: The purpose of `finishStartupLatch` is to ensure that we invoke `WorkerTask::stop` while the `WorkerTask` (on its separate thread) is in the middle of invoking `SourceTask::start`. We should add that logic back. I was only commenting on the error message; the rest of the test case looked correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15126) Change range queries to accept null lower and upper bounds
Lucia Cerchie created KAFKA-15126: - Summary: Change range queries to accept null lower and upper bounds Key: KAFKA-15126 URL: https://issues.apache.org/jira/browse/KAFKA-15126 Project: Kafka Issue Type: Improvement Components: streams Reporter: Lucia Cerchie Assignee: Lucia Cerchie {color:#1d1c1d}When web client requests come in with query params, it's common for those params to be null. We want developers to just be able to pass in the upper/lower bounds if they want instead of implementing their own logic to avoid getting the whole range (which will happen if they leave the params null). {color} {color:#1d1c1d}An example of the logic they can avoid using after this KIP is implemented is below:{color} {code:java} private RangeQuery> createRangeQuery(String lower, String upper) { if (isBlank(lower) && isBlank(upper)) { return RangeQuery.withNoBounds(); } else if (!isBlank(lower) && isBlank(upper)) { return RangeQuery.withLowerBound(lower); } else if (isBlank(lower) && !isBlank(upper)) { return RangeQuery.withUpperBound(upper); } else { return RangeQuery.withRange(lower, upper); } } {code} | | -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe closed pull request #7646: KAFKA-7504: prepare FetchResponses in the request handler threads
cmccabe closed pull request #7646: KAFKA-7504: prepare FetchResponses in the request handler threads URL: https://github.com/apache/kafka/pull/7646 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #4442: Add TestKit
cmccabe closed pull request #4442: Add TestKit URL: https://github.com/apache/kafka/pull/4442 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #4442: Add TestKit
cmccabe commented on PR #4442: URL: https://github.com/apache/kafka/pull/4442#issuecomment-1607963901 This has been implemented, although in a slightly different form. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #3794: KAFKA-5838. Speed up running system tests in docker a bit with better…
cmccabe closed pull request #3794: KAFKA-5838. Speed up running system tests in docker a bit with better… URL: https://github.com/apache/kafka/pull/3794 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jaewie commented on pull request #13891: Breaks down SocketServer classes into ConnectionQuotaEntity.scala and ConnectionQuotas.scala
jaewie commented on PR #13891: URL: https://github.com/apache/kafka/pull/13891#issuecomment-1607955171 Hi @kirktrue, I just created one here https://issues.apache.org/jira/browse/KAFKA-15125 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15125) Break down SocketServer classes into separate files
Jae Wie created KAFKA-15125: --- Summary: Break down SocketServer classes into separate files Key: KAFKA-15125 URL: https://issues.apache.org/jira/browse/KAFKA-15125 Project: Kafka Issue Type: Task Components: network Reporter: Jae Wie Since SocketServer is almost 2000 lines of code, it's easier to move its classes into separate files in order to make them more manageable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe closed pull request #3359: KAFKA-5062. Kafka brokers can accept malformed requests which allocat…
cmccabe closed pull request #3359: KAFKA-5062. Kafka brokers can accept malformed requests which allocat… URL: https://github.com/apache/kafka/pull/3359 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13852: KAFKA-15086:Set a reasonable segment size upper limit for MM2 internal topics
C0urante commented on PR #13852: URL: https://github.com/apache/kafka/pull/13852#issuecomment-1607920082 @hudeqi sorry, this is a tricky issue and I'm trying to take time to think things through :) I hate to say it, but I don't think we can make this change or anything like it without a KIP. This is for two reasons: 1. We're effectively changing the default value for the `offset.storage.topic.segment.bytes` property (even if we don't implement this change with that exact logic), which counts as a change to public API for the project 2. By explicitly setting a value for the offset topic's `segment.bytes` property, we cause any broker-side value for the [log.segment.bytes property](https://kafka.apache.org/documentation.html#brokerconfigs_log.segment.bytes) to be ignored. If the broker uses a lower value for this property than our default, then we may make things worse instead of better I still think it's likely that decreasing the segment size for the offsets topic would help, but it'd be nice if we could get the kind of review that a KIP requires before making that kind of change. As far as increasing the number of consumer threads goes, I think it's really a question of what the performance bottleneck is when reading to the end of the topic. If CPU is the issue, then sure, it'd probably help to scale up the number of consumers. However, if network transfer between the worker and the Kafka cluster is the limiting factor, then it won't have any impact. The nice thing about decreasing the segment size is that (as long as it leads to a reduction in the total size of the offsets topic), it would help in either case: you'd have less data to consume from Kafka, and also less data to process on your Connect worker. This almost certainly varies depending on the environment Kafka Connect and Kafka are run in, but my hunch is that your fix here would be more effective than scaling up the number of consumers. I'd be curious to see if we could get benchmark numbers on that front, though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #13845: KAFKA-15078; KRaft leader replys with snapshot for offset 0
jsancio commented on code in PR #13845: URL: https://github.com/apache/kafka/pull/13845#discussion_r1235409277 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1017,7 +1017,16 @@ private FetchResponseData tryCompleteFetchRequest( long fetchOffset = request.fetchOffset(); int lastFetchedEpoch = request.lastFetchedEpoch(); LeaderState state = quorum.leaderStateOrThrow(); -ValidOffsetAndEpoch validOffsetAndEpoch = log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch); + +Optional latestSnapshotId = log.latestSnapshotId(); +final ValidOffsetAndEpoch validOffsetAndEpoch; +if (fetchOffset == 0 && latestSnapshotId.isPresent()) { Review Comment: Thanks @dengziming We had a similar conversation in another PR: https://github.com/apache/kafka/pull/13834#discussion_r1224779841 In short, it is not clear to me that these improvements (implementation complexities) are a big win for the cluster metadata partition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13721: KAFKA-14782: Implementation Details Different from Documentation (del…
jolshan commented on PR #13721: URL: https://github.com/apache/kafka/pull/13721#issuecomment-1607896742 I think the safer thing would be to just update the text to not include this in the calculation and any documentation. I don't think this has caused anyone issues in the current state without this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13876: KAFKA-10733: Clean up producer exceptions
jolshan commented on PR #13876: URL: https://github.com/apache/kafka/pull/13876#issuecomment-1607891946 > - Keep things somewhat inconsistent > - Do not wrap fatal exceptions and keep the exceptions as-is > - Do wrap fatal exceptions and let the users deal with it > - Always rethrow with the correct exception type, so we do wrap the exception (giving us the right stack traces), but we don't use KafkaException but call a new method rethrow on KafkaException that rethrows the exception. However, that would certainly require a new KIP. These options I'm assuming are just for the ProducerFencedException and InvalidProducerEpochException? Or is this referring to the entire PR? I think maybe those two exceptions are the biggest concerns for the reasons you mention above. If it is the case, is the main reason for wrapping consistency? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on PR #13798: URL: https://github.com/apache/kafka/pull/13798#issuecomment-1607881974 The last builds are looking better. @divijvaidya please take a look when you get a chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clayburn commented on pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights
clayburn commented on PR #13676: URL: https://github.com/apache/kafka/pull/13676#issuecomment-1607863528 @ijuma Under our Software License and Sponsorship Agreement, Apache's users and contributors may use the Gradle Enterprise instance and run any ASF builds with GE enabled solely in connection with the development of any ASF open source project. This applies even if the users are employed by a third party or making such contributions to an ASF open source project as part of their work as directed by a third party employer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15123) Add tests for ChunkedBytesStream
[ https://issues.apache.org/jira/browse/KAFKA-15123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya reassigned KAFKA-15123: Assignee: Max Riedel > Add tests for ChunkedBytesStream > > > Key: KAFKA-15123 > URL: https://issues.apache.org/jira/browse/KAFKA-15123 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Assignee: Max Riedel >Priority: Minor > Labels: newbie > > We need to add cases against the public interfaces of this class to test for > scenarios for Int overflow etc. for input parameters > Refer: [https://github.com/apache/kafka/pull/12948#discussion_r1242345211] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner
C0urante commented on code in PR #13821: URL: https://github.com/apache/kafka/pull/13821#discussion_r1242437655 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.connect.components.Versioned; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.sql.Driver; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.ServiceConfigurationError; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +public abstract class PluginScanner { + +private static final Logger log = LoggerFactory.getLogger(PluginScanner.class); + +public PluginScanResult discoverPlugins(Set sources) { +long startMs = System.currentTimeMillis(); +List results = new ArrayList<>(); +for (PluginSource source : sources) { +results.add(scanUrlsAndAddPlugins(source)); +} +long endMs = System.currentTimeMillis(); +log.info("Scanning plugins with {} took {} ms", getClass().getSimpleName(), endMs - startMs); +return new PluginScanResult(results); +} + +private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) { +PluginScanResult plugins = scanPlugins(source); +loadJdbcDrivers(source.loader()); +return plugins; +} + +protected abstract PluginScanResult scanPlugins(PluginSource source); + +private void loadJdbcDrivers(final ClassLoader loader) { +// Apply here what java.sql.DriverManager does to discover and register classes +// implementing the java.sql.Driver interface. +AccessController.doPrivileged( +(PrivilegedAction) () -> { +ServiceLoader loadedDrivers = ServiceLoader.load( +Driver.class, +loader +); +Iterator driversIterator = loadedDrivers.iterator(); +try { +while (driversIterator.hasNext()) { +Driver driver = driversIterator.next(); +log.debug( +"Registered java.sql.Driver: {} to java.sql.DriverManager", +driver +); +} +} catch (Throwable t) { +log.debug( +"Ignoring java.sql.Driver classes listed in resources but not" ++ " present in class loader's classpath: ", +t +); +} +return null; +} +); +} + +@SuppressWarnings({"rawtypes", "unchecked"}) +protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { +return new PluginDesc(plugin, version, loader); +} + +@SuppressWarnings("unchecked") +protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +SortedSet> result = new TreeSet<>(); +ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); +for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { +try (LoaderSwap loaderSwap = withClassLoader(loader)) { +T pluginImpl; +try { +pluginImpl = iterator.next(); +} catch (ServiceConfigurationError t) { +log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); +continue; +} +Class pluginKlass = (Class) pluginImpl.getClass(); +if (pluginKlass.getClassLoader() != loader) { +log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", +
[jira] [Commented] (KAFKA-15123) Add tests for ChunkedBytesStream
[ https://issues.apache.org/jira/browse/KAFKA-15123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737267#comment-17737267 ] Divij Vaidya commented on KAFKA-15123: -- Sure thing. Please go ahead. I assigned this to you. > Add tests for ChunkedBytesStream > > > Key: KAFKA-15123 > URL: https://issues.apache.org/jira/browse/KAFKA-15123 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Assignee: Max Riedel >Priority: Minor > Labels: newbie > > We need to add cases against the public interfaces of this class to test for > scenarios for Int overflow etc. for input parameters > Refer: [https://github.com/apache/kafka/pull/12948#discussion_r1242345211] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-9015) Unify metric names
[ https://issues.apache.org/jira/browse/KAFKA-9015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-9015: - Labels: easy kip needs-kip newbie (was: easy kip newbie) > Unify metric names > -- > > Key: KAFKA-9015 > URL: https://issues.apache.org/jira/browse/KAFKA-9015 > Project: Kafka > Issue Type: Improvement > Components: metrics >Affects Versions: 2.5.0 >Reporter: Viktor Somogyi-Vass >Assignee: Csenge Maruzsi >Priority: Major > Labels: easy, kip, needs-kip, newbie > > Some of the metrics use a lower-case style metrics name like "i-am-a-metric" > while the majority uses UpperCamelCase (IAmAnotherMetric). We need > consistency across the project and since the majority of the metrics uses the > camel case notation, we need to change the others. > We might have to think about backward compatibility and also a KIP may be > needed since metrics are public interfaces. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15118) Need for a Centralized Configuration Management System in Apache Kafka
[ https://issues.apache.org/jira/browse/KAFKA-15118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15118: -- Labels: needs-kip (was: ) > Need for a Centralized Configuration Management System in Apache Kafka > -- > > Key: KAFKA-15118 > URL: https://issues.apache.org/jira/browse/KAFKA-15118 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.7.1 >Reporter: Jimmy Wang >Priority: Major > Labels: needs-kip > Original Estimate: 48h > Remaining Estimate: 48h > > Hi all, > In our use of Apache Kafka, we found something strange following: > Despite our partitions being well-distributed across brokers, we noticed a > significant discrepancy in disk usage between different brokers. > Specifically, the same partition takes up different amounts of disk space on > different brokers. Upon investigating, we found that the root cause of this > discrepancy was a variation in {{log.retention.hours}} setting between > different brokers. > On the one hand, we know that we should make sure that the server.properties > should be same across the whole cluster. But on the other hand, could kafka > provide guarantee or advance check to make sure all the configurations of > brokers to be the same to avoid such situations? > Here is some of my opinions: > # Provide centralized configuration center or just manage them in internal > topics(kraft mode) or zookeeper like dynamic configuration. > # Warn users about potential inconsistencies during the broker startup > process. > Best regards, > Jimmy Wang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15118) Need for a Centralized Configuration Management System in Apache Kafka
[ https://issues.apache.org/jira/browse/KAFKA-15118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737245#comment-17737245 ] Kirk True commented on KAFKA-15118: --- [~jimmywang611] Thanks for the ticket. Centralized configuration is something we've talked about doing for {_}clients{_}, but brokers could benefit from this too. I'm not sure if this is something that should be handled inside Kafka or relegated to the orchestration layer outside of Kafka. Either way, this is project would have a lot of ramifications, so a KIP would almost certainly be needed. Is that something that you plan to write up? Thanks! > Need for a Centralized Configuration Management System in Apache Kafka > -- > > Key: KAFKA-15118 > URL: https://issues.apache.org/jira/browse/KAFKA-15118 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.7.1 >Reporter: Jimmy Wang >Priority: Major > Original Estimate: 48h > Remaining Estimate: 48h > > Hi all, > In our use of Apache Kafka, we found something strange following: > Despite our partitions being well-distributed across brokers, we noticed a > significant discrepancy in disk usage between different brokers. > Specifically, the same partition takes up different amounts of disk space on > different brokers. Upon investigating, we found that the root cause of this > discrepancy was a variation in {{log.retention.hours}} setting between > different brokers. > On the one hand, we know that we should make sure that the server.properties > should be same across the whole cluster. But on the other hand, could kafka > provide guarantee or advance check to make sure all the configurations of > brokers to be the same to avoid such situations? > Here is some of my opinions: > # Provide centralized configuration center or just manage them in internal > topics(kraft mode) or zookeeper like dynamic configuration. > # Warn users about potential inconsistencies during the broker startup > process. > Best regards, > Jimmy Wang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jsancio commented on a diff in pull request #13917: MINOR; Failed move should be logged at WARN
jsancio commented on code in PR #13917: URL: https://github.com/apache/kafka/pull/13917#discussion_r1242449562 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -978,9 +978,9 @@ public static void atomicMoveWithFallback(Path source, Path target, boolean need Files.move(source, target, StandardCopyOption.ATOMIC_MOVE); } catch (IOException outer) { try { +log.warn("Failed atomic move of {} to {} retring with a non-atomic move", source, target, outer); Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); -log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, -outer.getMessage()); +log.debug("Non-atomic move of {} to {} succeeded after atomic move failed", source, target); Review Comment: Yeah. I think so. I care more about the atomic move failing and less about the non-atomic move succeeding. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15122) Moving partitions between log dirs leads to kafka.log:type=Log metrics being deleted
[ https://issues.apache.org/jira/browse/KAFKA-15122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-15122. Fix Version/s: 3.5.0 Resolution: Duplicate Duplicate of https://issues.apache.org/jira/browse/KAFKA-14544 which is fixed in 3.5.0 > Moving partitions between log dirs leads to kafka.log:type=Log metrics being > deleted > > > Key: KAFKA-15122 > URL: https://issues.apache.org/jira/browse/KAFKA-15122 > Project: Kafka > Issue Type: Task >Affects Versions: 3.5.0 >Reporter: Mickael Maison >Priority: Major > Fix For: 3.5.0 > > > # Start a broker with 2 log directories > # Create a topic-partition > Metrics with the following names are created: > kafka.log:type=Log,name=Size,topic=,partition=0 > # Using kafka-reassign-partitions move that partition to the other log > directory > A tag is-future=true is added to the existing metrics, > kafka.log:type=Log,name=Size,topic=,partition=0,is-future=true > # Using kafka-reassign-partitions move that partition back to its original > log directory > The metrics are deleted! > I don't expect the metrics to be renamed during the first reassignment. The > metrics should not be deleted during the second reassignment, the topic still > exists. Restarting the broker resolves the issue. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15122) Moving partitions between log dirs leads to kafka.log:type=Log metrics being deleted
[ https://issues.apache.org/jira/browse/KAFKA-15122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-15122: --- Affects Version/s: 3.4.0 (was: 3.5.0) > Moving partitions between log dirs leads to kafka.log:type=Log metrics being > deleted > > > Key: KAFKA-15122 > URL: https://issues.apache.org/jira/browse/KAFKA-15122 > Project: Kafka > Issue Type: Task >Affects Versions: 3.4.0 >Reporter: Mickael Maison >Priority: Major > Fix For: 3.5.0 > > > # Start a broker with 2 log directories > # Create a topic-partition > Metrics with the following names are created: > kafka.log:type=Log,name=Size,topic=,partition=0 > # Using kafka-reassign-partitions move that partition to the other log > directory > A tag is-future=true is added to the existing metrics, > kafka.log:type=Log,name=Size,topic=,partition=0,is-future=true > # Using kafka-reassign-partitions move that partition back to its original > log directory > The metrics are deleted! > I don't expect the metrics to be renamed during the first reassignment. The > metrics should not be deleted during the second reassignment, the topic still > exists. Restarting the broker resolves the issue. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rondagostino commented on a diff in pull request #13917: MINOR; Failed move should be logged at WARN
rondagostino commented on code in PR #13917: URL: https://github.com/apache/kafka/pull/13917#discussion_r1242445724 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -978,9 +978,9 @@ public static void atomicMoveWithFallback(Path source, Path target, boolean need Files.move(source, target, StandardCopyOption.ATOMIC_MOVE); } catch (IOException outer) { try { +log.warn("Failed atomic move of {} to {} retring with a non-atomic move", source, target, outer); Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); -log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, -outer.getMessage()); +log.debug("Non-atomic move of {} to {} succeeded after atomic move failed", source, target); Review Comment: Maybe it is okay now that we are logging the initial WARn? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13917: MINOR; Failed move should be logged at WARN
rondagostino commented on code in PR #13917: URL: https://github.com/apache/kafka/pull/13917#discussion_r1242444579 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -978,9 +978,9 @@ public static void atomicMoveWithFallback(Path source, Path target, boolean need Files.move(source, target, StandardCopyOption.ATOMIC_MOVE); } catch (IOException outer) { try { +log.warn("Failed atomic move of {} to {} retring with a non-atomic move", source, target, outer); Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); -log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, -outer.getMessage()); +log.debug("Non-atomic move of {} to {} succeeded after atomic move failed", source, target); Review Comment: Remain DEBUG? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #13917: MINOR; Failed move should be logged at WARN
jsancio commented on code in PR #13917: URL: https://github.com/apache/kafka/pull/13917#discussion_r1242443198 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -979,7 +979,7 @@ public static void atomicMoveWithFallback(Path source, Path target, boolean need } catch (IOException outer) { try { Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); -log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, +log.warn("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, Review Comment: I agree. I added an WARN log message after the atomic move failed and before the non-atomic move. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #13917: MINOR; Failed move should be logged at WARN
jsancio commented on code in PR #13917: URL: https://github.com/apache/kafka/pull/13917#discussion_r1242429861 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -979,7 +979,7 @@ public static void atomicMoveWithFallback(Path source, Path target, boolean need } catch (IOException outer) { try { Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); -log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, +log.warn("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, Review Comment: Hmm. Log levels are hard to argue. To me, it is really bad if a `move` fails and Kafka falls back to `copy`. There is a lot of code like the `Snaphsot.freeze` that assume `move` semantic for correctness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13917: MINOR; Failed move should be logged at WARN
rondagostino commented on code in PR #13917: URL: https://github.com/apache/kafka/pull/13917#discussion_r1242428311 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -979,7 +979,7 @@ public static void atomicMoveWithFallback(Path source, Path target, boolean need } catch (IOException outer) { try { Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); -log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, +log.warn("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, Review Comment: Any maybe we should log something above the retry saying we are going to retry? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13917: MINOR; Failed move should be logged at WARN
rondagostino commented on code in PR #13917: URL: https://github.com/apache/kafka/pull/13917#discussion_r1242424176 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -979,7 +979,7 @@ public static void atomicMoveWithFallback(Path source, Path target, boolean need } catch (IOException outer) { try { Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); -log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, +log.warn("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, Review Comment: Is it really a WARN? Maybe INFO is more appropriate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request, #13917: MINOR; Failed move should be logged at WARN
jsancio opened a new pull request, #13917: URL: https://github.com/apache/kafka/pull/13917 When Kafka fails to perform file move the error is getting swallowed. Kafka should log these cases at least at WARN level. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15122) Moving partitions between log dirs leads to kafka.log:type=Log metrics being deleted
[ https://issues.apache.org/jira/browse/KAFKA-15122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-15122: --- Description: # Start a broker with 2 log directories # Create a topic-partition Metrics with the following names are created: kafka.log:type=Log,name=Size,topic=,partition=0 # Using kafka-reassign-partitions move that partition to the other log directory A tag is-future=true is added to the existing metrics, kafka.log:type=Log,name=Size,topic=,partition=0,is-future=true # Using kafka-reassign-partitions move that partition back to its original log directory The metrics are deleted! I don't expect the metrics to be renamed during the first reassignment. The metrics should not be deleted during the second reassignment, the topic still exists. Restarting the broker resolves the issue. was: # Start a broker with 2 log directories # Create a topic-partition Metrics with the following names are created: kafka.log:type=Log,name=Size,topic=,partition=0 # Using kafka-reassign-partitions move that partition to the other log directory A tag isFuture=true is added to the existing metrics, kafka.log:type=Log,name=Size,topic=,partition=0,isFuture=true # Using kafka-reassign-partitions move that partition back to its original log directory The metrics are deleted! I don't expect the metrics to be renamed during the first reassignment. The metrics should not be deleted during the second reassignment, the topic still exists. Restarting the broker resolves the issue. > Moving partitions between log dirs leads to kafka.log:type=Log metrics being > deleted > > > Key: KAFKA-15122 > URL: https://issues.apache.org/jira/browse/KAFKA-15122 > Project: Kafka > Issue Type: Task >Affects Versions: 3.5.0 >Reporter: Mickael Maison >Priority: Major > > # Start a broker with 2 log directories > # Create a topic-partition > Metrics with the following names are created: > kafka.log:type=Log,name=Size,topic=,partition=0 > # Using kafka-reassign-partitions move that partition to the other log > directory > A tag is-future=true is added to the existing metrics, > kafka.log:type=Log,name=Size,topic=,partition=0,is-future=true > # Using kafka-reassign-partitions move that partition back to its original > log directory > The metrics are deleted! > I don't expect the metrics to be renamed during the first reassignment. The > metrics should not be deleted during the second reassignment, the topic still > exists. Restarting the broker resolves the issue. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15123) Add tests for ChunkedBytesStream
[ https://issues.apache.org/jira/browse/KAFKA-15123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737201#comment-17737201 ] Max Riedel commented on KAFKA-15123: Hi [~divijvaidya] This sounds like a good task to start working on kafka. I would like to pick it up > Add tests for ChunkedBytesStream > > > Key: KAFKA-15123 > URL: https://issues.apache.org/jira/browse/KAFKA-15123 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Minor > Labels: newbie > > We need to add cases against the public interfaces of this class to test for > scenarios for Int overflow etc. for input parameters > Refer: [https://github.com/apache/kafka/pull/12948#discussion_r1242345211] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15124) KRaft snapshot freeze should never perform copy
José Armando García Sancio created KAFKA-15124: -- Summary: KRaft snapshot freeze should never perform copy Key: KAFKA-15124 URL: https://issues.apache.org/jira/browse/KAFKA-15124 Project: Kafka Issue Type: Sub-task Reporter: José Armando García Sancio Assignee: José Armando García Sancio -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds
divijvaidya commented on code in PR #12948: URL: https://github.com/apache/kafka/pull/12948#discussion_r1242377704 ## clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java: ## @@ -290,25 +290,27 @@ public long skip(long toSkip) throws IOException { // Skip bytes stored in intermediate buffer first int avail = count - pos; -long bytesSkipped = (avail < remaining) ? avail : remaining; +int bytesSkipped = Math.min(avail, (int) remaining); Review Comment: > It does show a gap in the tests for this class, we should ideally have tests that cover these boundary cases. Fair point. Added a JIRA https://issues.apache.org/jira/browse/KAFKA-15123 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15123) Add tests for ChunkedBytesStream
Divij Vaidya created KAFKA-15123: Summary: Add tests for ChunkedBytesStream Key: KAFKA-15123 URL: https://issues.apache.org/jira/browse/KAFKA-15123 Project: Kafka Issue Type: Improvement Reporter: Divij Vaidya We need to add cases against the public interfaces of this class to test for scenarios for Int overflow etc. for input parameters Refer: [https://github.com/apache/kafka/pull/12948#discussion_r1242345211] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15122) Moving partitions between log dirs leads to kafka.log:type=Log metrics being deleted
Mickael Maison created KAFKA-15122: -- Summary: Moving partitions between log dirs leads to kafka.log:type=Log metrics being deleted Key: KAFKA-15122 URL: https://issues.apache.org/jira/browse/KAFKA-15122 Project: Kafka Issue Type: Task Affects Versions: 3.5.0 Reporter: Mickael Maison # Start a broker with 2 log directories # Create a topic-partition Metrics with the following names are created: kafka.log:type=Log,name=Size,topic=,partition=0 # Using kafka-reassign-partitions move that partition to the other log directory A tag isFuture=true is added to the existing metrics, kafka.log:type=Log,name=Size,topic=,partition=0,isFuture=true # Using kafka-reassign-partitions move that partition back to its original log directory The metrics are deleted! I don't expect the metrics to be renamed during the first reassignment. The metrics should not be deleted during the second reassignment, the topic still exists. Restarting the broker resolves the issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds
ijuma commented on code in PR #12948: URL: https://github.com/apache/kafka/pull/12948#discussion_r1242345211 ## clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java: ## @@ -290,25 +290,27 @@ public long skip(long toSkip) throws IOException { // Skip bytes stored in intermediate buffer first int avail = count - pos; -long bytesSkipped = (avail < remaining) ? avail : remaining; +int bytesSkipped = Math.min(avail, (int) remaining); Review Comment: > behaviour is implementation defined Can you share a reference for this? My understanding is that the Java specifies the behavior in these cases. Perhaps you're thinking of a C or something like that? > let's say remaining = Integer.MIN_VALUE + 1000 I think you meant to say that if we have a `long` of value (Integer.MAX_VALUE + 1000) may result in sign extension during truncation resulting in a negative int. That's a fair point - my bad. I'll fix this. It does show a gap in the tests for this class, we should ideally have tests that cover these boundary cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0
[ https://issues.apache.org/jira/browse/KAFKA-15053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15053: - Fix Version/s: 3.6.0 > Regression for security.protocol validation starting from 3.3.0 > --- > > Key: KAFKA-15053 > URL: https://issues.apache.org/jira/browse/KAFKA-15053 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Bo Gao >Assignee: Bo Gao >Priority: Major > Fix For: 3.6.0 > > > [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue > introduced validations on multiple configs. As a consequence, config > {{security.protocol}} now only allows upper case values such as PLAINTEXT, > SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like > sasl_ssl, ssl are also supported, there's even a case insensitive logic > inside > [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73] > to handle the lower case values. > I think we should treat this as a regression bug since we don't support lower > case values anymore since 3.3.0. For versions later than 3.3.0, we are > getting error like this when using lower case value sasl_ssl > {{Invalid value sasl_ssl for configuration security.protocol: String must be > one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on pull request #13700: KAFKA-14959: remove delayed queue and exempt sensors during ClientQuota and ClientRequestQuota managers shutdown
divijvaidya commented on PR #13700: URL: https://github.com/apache/kafka/pull/13700#issuecomment-1607659721 I wanted to let you know that this is on my radar and I need a few more days to get to this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds
divijvaidya commented on code in PR #12948: URL: https://github.com/apache/kafka/pull/12948#discussion_r1242328144 ## clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java: ## @@ -290,25 +290,27 @@ public long skip(long toSkip) throws IOException { // Skip bytes stored in intermediate buffer first int avail = count - pos; -long bytesSkipped = (avail < remaining) ? avail : remaining; +int bytesSkipped = Math.min(avail, (int) remaining); Review Comment: I am concerned about cases where `remaining` is outside the range of representable `int` values, let's say `remaining = Integer.MIN_VALUE + 1000`. When down casting in cases of overflow (i.e. in `Math.min(avail, (int) remaining)`), the behaviour is implementation defined and numeric value for `(int)remaining` may end up leading to a negative value numeric int. Calculating min for this negative value with avail will lead to bytesSkipped as negative. In the implementation I suggested, the result of `Math.min((long) avail, remaining)` is guaranteed to fit in `int` because it's upper bound by `avail`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15105) Flaky test FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable
[ https://issues.apache.org/jira/browse/KAFKA-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737186#comment-17737186 ] Max Riedel commented on KAFKA-15105: I would like to work on this issue. I'm still trying to understand how the build infrastructure works. Can someone give me a hint, how to reproduce the behavior? > Flaky test > FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable > - > > Key: KAFKA-15105 > URL: https://issues.apache.org/jira/browse/KAFKA-15105 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Josep Prat >Priority: Major > Labels: flaky-test > > Test > integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable() > became flaky. An example can be found here: > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable__/] > The error might be caused because of a previous kafka cluster used for > another test wasn't cleaned up properly before this one run. > > h3. Error Message > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists.{code} > h3. Stacktrace > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13831: KAFKA-15053: Use case insensitive validator for security.protocol config
divijvaidya commented on code in PR #13831: URL: https://github.com/apache/kafka/pull/13831#discussion_r1242302387 ## docs/security.html: ## @@ -72,6 +72,10 @@ SSL SASL_PLAINTEXT SASL_SSL + plaintext Review Comment: Instead of adding all values, please consider the following: `Possible options (case insensitive) for the security protocol are given below:` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds
ijuma commented on code in PR #12948: URL: https://github.com/apache/kafka/pull/12948#discussion_r1242312432 ## Jenkinsfile: ## @@ -155,79 +155,23 @@ pipeline { echo 'Skipping Kafka Streams archetype test for Java 17' } } - -// To avoid excessive Jenkins resource usage, we only run the stages -// above at the PR stage. The ones below are executed after changes -// are pushed to trunk and/or release branches. We achieve this via -// the `when` clause. - -stage('JDK 8 and Scala 2.13') { Review Comment: That statement isn't strictly true since there could be bugs in the implementation - that's why we had these tests in the first place. But the truth is that they're low value since (1) the probability of a bug affecting JDK 8 and Scala 2.13 but not JDK 8 and Scala 2.12 is low (2) having too many variants makes it even less likely for people to pay attention to the flaky failures that exist (3) it's expensive to run so many variants. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds
ijuma commented on code in PR #12948: URL: https://github.com/apache/kafka/pull/12948#discussion_r1242309422 ## clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java: ## @@ -290,25 +290,27 @@ public long skip(long toSkip) throws IOException { // Skip bytes stored in intermediate buffer first int avail = count - pos; -long bytesSkipped = (avail < remaining) ? avail : remaining; +int bytesSkipped = Math.min(avail, (int) remaining); Review Comment: Can you please explain an example where these two variants would have a different result given that `avail` is a positive int? ## clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java: ## @@ -290,25 +290,27 @@ public long skip(long toSkip) throws IOException { // Skip bytes stored in intermediate buffer first int avail = count - pos; -long bytesSkipped = (avail < remaining) ? avail : remaining; +int bytesSkipped = Math.min(avail, (int) remaining); Review Comment: Can you please share an example where these two variants would have a different result given that `avail` is a positive int? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
divijvaidya commented on PR #12685: URL: https://github.com/apache/kafka/pull/12685#issuecomment-1607609145 @LinShunKang Give me a few days. I will get to it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13915: KAFKA-14930: Document the new PATCH and DELETE offsets REST APIs for Connect
C0urante commented on code in PR #13915: URL: https://github.com/apache/kafka/pull/13915#discussion_r1242245895 ## docs/connect.html: ## @@ -301,7 +301,7 @@ REST API GET /connectors/{name}/tasks - get a list of tasks currently running for a connector GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed. Any resources claimed by its tasks are left allocated, which allows the connector to begin processing data quickly once it is resumed. -PUT /connectors/{name}/stop - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. +PUT /connectors/{name}/stop - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. Note that the offsets for a connector can be modified via the offsets management REST APIs only if it is in the stopped state. Review Comment: ```suggestion PUT /connectors/{name}/stop - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. Note that the offsets for a connector can be only modified via the offsets management endpoints if it is in the stopped state. ``` ## docs/connect.html: ## @@ -313,7 +313,13 @@ REST API DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration GET /connectors/{name}/topics - get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued PUT /connectors/{name}/topics/reset - send a request to empty the set of active topics of a connector -GET /connectors/{name}/offsets - get the current offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect";>KIP-875 for more details) +Offsets management REST APIs (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect";>KIP-875 for more details): + +GET /connectors/{name}/offsets - get the current offsets for a connector +DELETE /connectors/{name}/offsets - reset the offsets for a connector. The connector must exist and must be in the stopped state. Review Comment: Nit: for consistency, we should leave out the period from the last sentence of each item ```suggestion DELETE /connectors/{name}/offsets - reset the offsets for a connector. The connector must exist and must be in the stopped state ``` Also, would it be possible to link to the docs for the `PUT /connectors/{name}/stop` endpoint when we refer to it here? ## docs/connect.html: ## @@ -313,7 +313,13 @@ REST API DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration GET /connectors/{name}/topics - get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued PUT /connectors/{name}/topics/reset - send a request to empty the set of active topics of a connector -GET /connectors/{name}/offsets - get the current offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect";>KIP-875 for more details) +Offsets management REST APIs (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect";>KIP-875 for more details): + +GET /connectors/{name}/offsets - get the current offsets for a connector +DELETE /connectors/{name}/offsets - reset the offsets for a connector. The connector must exist and must be in the stopped state. +PATCH /connectors/{name}/offsets - alter the offsets for a connector. The connector must exist and must be in the stopped state. The request body should be a JSON object containing a JSON array offsets field, similar to the response body of the GET /connectors/{name}/offsets REST API. Review Comment: Same nit RE trailing period,
[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito
clolov commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1242226466 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2688,20 +2614,8 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo .thenReturn(singletonList(task10)); final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId"); -expect(consumer.groupMetadata()).andReturn(groupMetadata); -producer.commitTransaction(expectedCommittedOffsets, groupMetadata); -expectLastCall(); - -task00.committedOffsets(); -EasyMock.expectLastCall(); -task01.committedOffsets(); -EasyMock.expectLastCall(); -task02.committedOffsets(); -EasyMock.expectLastCall(); -task10.committedOffsets(); -EasyMock.expectLastCall(); - -replay(consumer); +when(consumer.groupMetadata()).thenReturn(groupMetadata); +doNothing().when(producer).commitTransaction(expectedCommittedOffsets, groupMetadata); Review Comment: Alright, I removed it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito
clolov commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1242149622 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) { }; // `handleAssignment` -expectRestoreToBeCompleted(consumer); +expectAssignmentToBeCalled(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); -expect(consumer.assignment()).andReturn(taskId00Partitions); -replay(consumer); +final Set partitions = union(HashSet::new, taskId00Partitions); +when(consumer.assignment()).thenReturn(partitions); Review Comment: ~~Oh, silly me, there is a mkSet method in Utils. Okay, I will remedy this.~~ Silly me x2, the thing is already a set -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito
clolov commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1242148687 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) { }; // `handleAssignment` -expectRestoreToBeCompleted(consumer); +expectAssignmentToBeCalled(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); -expect(consumer.assignment()).andReturn(taskId00Partitions); -replay(consumer); +final Set partitions = union(HashSet::new, taskId00Partitions); +when(consumer.assignment()).thenReturn(partitions); Review Comment: ~~Because I couldn't figure out a better way to create a set with only one element and Collections does not have a singletonSet 😞 ~~ ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) { }; // `handleAssignment` -expectRestoreToBeCompleted(consumer); +expectAssignmentToBeCalled(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); -expect(consumer.assignment()).andReturn(taskId00Partitions); -replay(consumer); +final Set partitions = union(HashSet::new, taskId00Partitions); +when(consumer.assignment()).thenReturn(partitions); Review Comment: ~~Because I couldn't figure out a better way to create a set with only one element and Collections does not have a singletonSet~~ 😞 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito
clolov commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1242151305 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) { }; // `handleAssignment` -expectRestoreToBeCompleted(consumer); +expectAssignmentToBeCalled(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); -expect(consumer.assignment()).andReturn(taskId00Partitions); -replay(consumer); +final Set partitions = union(HashSet::new, taskId00Partitions); +when(consumer.assignment()).thenReturn(partitions); Review Comment: You are correct, it appears I can remove line 1923 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito
clolov commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1242149622 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) { }; // `handleAssignment` -expectRestoreToBeCompleted(consumer); +expectAssignmentToBeCalled(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); -expect(consumer.assignment()).andReturn(taskId00Partitions); -replay(consumer); +final Set partitions = union(HashSet::new, taskId00Partitions); +when(consumer.assignment()).thenReturn(partitions); Review Comment: Oh, silly me, there is a mkSet method in Utils. Okay, I will remedy this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito
clolov commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1242148687 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) { }; // `handleAssignment` -expectRestoreToBeCompleted(consumer); +expectAssignmentToBeCalled(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); -expect(consumer.assignment()).andReturn(taskId00Partitions); -replay(consumer); +final Set partitions = union(HashSet::new, taskId00Partitions); +when(consumer.assignment()).thenReturn(partitions); Review Comment: Because I couldn't figure out a better way to create a set with only one element and Collections does not have a singletonSet 😞 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito
clolov commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1242144879 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2418,19 +2368,18 @@ public void markChangelogAsCorrupted(final Collection partitions mkEntry(taskId02, taskId02Partitions) ); -expectRestoreToBeCompleted(consumer); +expectAssignmentToBeCalled(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive))) .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded)); final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId"); -expect(consumer.groupMetadata()).andReturn(groupMetadata); +when(consumer.groupMetadata()).thenReturn(groupMetadata); doThrow(new TimeoutException()).when(producer).commitTransaction(expectedCommittedOffsets, groupMetadata); -expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions)); - -replay(consumer, stateManager); Review Comment: 😱 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2418,19 +2368,18 @@ public void markChangelogAsCorrupted(final Collection partitions mkEntry(taskId02, taskId02Partitions) ); -expectRestoreToBeCompleted(consumer); +expectAssignmentToBeCalled(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive))) .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded)); final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId"); -expect(consumer.groupMetadata()).andReturn(groupMetadata); +when(consumer.groupMetadata()).thenReturn(groupMetadata); doThrow(new TimeoutException()).when(producer).commitTransaction(expectedCommittedOffsets, groupMetadata); -expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions)); - -replay(consumer, stateManager); Review Comment: I will re-add it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito
clolov commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1242142731 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2469,19 +2416,18 @@ public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() { assertThat(task00.state(), is(Task.State.CLOSED)); assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); +verifyResumeWasCalled(consumer); Review Comment: On the same topic, yes, I could inline the setup and if you are okay with it I will then inline the verification as well. Let me know your thoughts! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito
clolov commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1242140474 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2469,19 +2416,18 @@ public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() { assertThat(task00.state(), is(Task.State.CLOSED)); assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); +verifyResumeWasCalled(consumer); Review Comment: I will answer here, but the answer applies to all the other places you have asked it as well. The original code for `expectRestoreToBeCompleted` was: ``` private static void expectRestoreToBeCompleted(final Consumer consumer) { final Set assignment = singleton(new TopicPartition("assignment", 0)); expect(consumer.assignment()).andReturn(assignment); consumer.resume(assignment); expectLastCall(); } ``` This should in theory be changed to ``` private static void expectRestoreToBeCompleted(final Consumer consumer) { final Set assignment = singleton(new TopicPartition("assignment", 0)); when(consumer.assignment()).thenReturn(assignment); Mockito.verify(consumer).resume(assignment); } ``` However, if we have the verify there it will fail as it needs to be verified after the object/method under test is exercised. Hence I moved it to a separate function and added it as a verification at the end of the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito
clolov commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1242132233 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -4050,12 +3928,8 @@ public void shouldHaveRemainingPartitionsUncleared() { final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets); -expectRestoreToBeCompleted(consumer); +expectAssignmentToBeCalled(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); -consumer.commitSync(offsets); Review Comment: This is not called, this is why I removed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito
clolov commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1242131444 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -3418,16 +3311,12 @@ public void shouldCommitProvidedTasksIfNeeded() { mkEntry(taskId05, taskId05Partitions) ); -expectRestoreToBeCompleted(consumer); +expectAssignmentToBeCalled(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive))) .thenReturn(Arrays.asList(task00, task01, task02)); when(standbyTaskCreator.createTasks(assignmentStandby)) .thenReturn(Arrays.asList(task03, task04, task05)); -consumer.commitSync(eq(emptyMap())); Review Comment: This is just not called, that's why I removed it originally -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito
clolov commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1242121402 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -3219,17 +3128,6 @@ public void shouldCloseStandbyTasksOnShutdown() { // `handleAssignment` when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00)); -// `tryToCompleteRestoration` -expect(consumer.assignment()).andReturn(emptySet()); -consumer.resume(eq(emptySet())); -expectLastCall(); - -// `shutdown` -consumer.commitSync(Collections.emptyMap()); Review Comment: ~~I don't think there is a need for verification here. The consumer was only replayed in EasyMock and the replayed behaviour comes by default in Mockito~~ This is just not called, that's why I removed it originally -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito
clolov commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1242121402 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -3219,17 +3128,6 @@ public void shouldCloseStandbyTasksOnShutdown() { // `handleAssignment` when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00)); -// `tryToCompleteRestoration` -expect(consumer.assignment()).andReturn(emptySet()); -consumer.resume(eq(emptySet())); -expectLastCall(); - -// `shutdown` -consumer.commitSync(Collections.emptyMap()); Review Comment: I don't think there is a need for verification here. The consumer was only replayed in EasyMock and the replayed behaviour comes by default in Mockito -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito
cadonna commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1241990023 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() { assertEquals(taskManager.notPausedTasks().size(), 0); } -private static void expectRestoreToBeCompleted(final Consumer consumer) { +private static void expectAssignmentToBeCalled(final Consumer consumer) { final Set assignment = singleton(new TopicPartition("assignment", 0)); -expect(consumer.assignment()).andReturn(assignment); -consumer.resume(assignment); -expectLastCall(); +when(consumer.assignment()).thenReturn(assignment); +} Review Comment: I would inline this function since it became an one-liner. ```java when(consumer.assignment()).thenReturn(singleton(new TopicPartition("assignment", 0))); ``` ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2034,12 +1993,9 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { // `handleAssignment` when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))) .thenReturn(asList(corruptedTask, nonCorruptedTask)); -expectRestoreToBeCompleted(consumer); -expect(consumer.assignment()).andReturn(taskId00Partitions); -// check that we should not commit empty map either -consumer.commitSync(eq(emptyMap())); -expectLastCall().andStubThrow(new AssertionError("should not invoke commitSync when offset map is empty")); -replay(consumer); +expectAssignmentToBeCalled(consumer); +final Set partitions = union(HashSet::new, taskId00Partitions); Review Comment: Same here ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2449,17 +2398,15 @@ public void markChangelogAsCorrupted(final Collection partitions assertThat(revokedActiveTask.state(), is(State.SUSPENDED)); assertThat(unrevokedActiveTask.state(), is(State.CREATED)); assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING)); +verifyResumeWasCalledWith(consumer, partitions); Review Comment: Why do you verify? It was not verified in the original code. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2377,6 +2326,7 @@ public void markChangelogAsCorrupted(final Collection partitions assertThat(revokedActiveTask.state(), is(State.SUSPENDED)); assertThat(unrevokedActiveTaskWithCommitNeeded.state(), is(State.CREATED)); assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING)); +verifyResumeWasCalledWith(consumer, partition); Review Comment: Why do you verify? It was not verified in the original code. I added a couple of this comments below. However, I am not sure whether the original author did not want to verify the consumer or if they forgot about it. I will leave it to you if you want to keep them or not. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2805,6 +2716,7 @@ public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() { assertThat(task00.commitNeeded, is(true)); assertThat(task10.commitPrepared, is(false)); +verifyResumeWasCalled(consumer); Review Comment: Why do you verify? It was not verified in the original code. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2834,14 +2744,14 @@ public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() { taskManager.handleAssignment(assignmentActive, Collections.emptyMap()); assertThat(task00.commitNeeded, is(true)); +verifyResumeWasCalled(consumer); Review Comment: Why do you verify? It was not verified in the original code. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2688,20 +2614,8 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo .thenReturn(singletonList(task10)); final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId"); -expect(consumer.groupMetadata()).andReturn(groupMetadata); -producer.commitTransaction(expectedCommittedOffsets, groupMetadata); -expectLastCall(); - -task00.committedOffsets(); -EasyMock.expectLastCall(); -task01.committedOffsets(); -EasyMock.expectLastCall(); -task02.committedOffsets(); -EasyMock.expectLastCall(); -task10.committedOffsets(); -EasyMock.expectLastCall(); Revie