[kafka] branch trunk updated: KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing (#14455)
This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 3c9031c6245 KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing (#14455) 3c9031c6245 is described below commit 3c9031c62455e4eaa3f5d16a3bba94d7e3159fb6 Author: Gantigmaa Selenge <39860586+tinasele...@users.noreply.github.com> AuthorDate: Wed Oct 11 04:41:46 2023 +0100 KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing (#14455) AdminClient will throw IllegalStateException instead of TimeoutException if it receives new calls while closing down. This is more consistent with how Consumer and Producer clients handle new calls after closed down. Reviewers: Luke Chen , Kirk True , Kamal Chandraprakash , vamossagar12 --- .../org/apache/kafka/clients/admin/KafkaAdminClient.java | 5 ++--- .../org/apache/kafka/clients/admin/KafkaAdminClientTest.java | 12 .../kafka/api/PlaintextAdminIntegrationTest.scala| 4 ++-- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 4db6b271946..27d28b5b336 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1546,9 +1546,8 @@ public class KafkaAdminClient extends AdminClient { */ void call(Call call, long now) { if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) { -log.debug("The AdminClient is not accepting new calls. Timing out {}.", call); -call.handleTimeoutFailure(time.milliseconds(), -new TimeoutException("The AdminClient thread is not accepting new calls.")); +log.debug("Cannot accept new call {} when AdminClient is closing.", call); +call.handleFailure(new IllegalStateException("Cannot accept new calls when AdminClient is closing.")); } else if (metadataManager.usingBootstrapControllers() && (!call.nodeProvider.supportsUseControllers())) { call.fail(now, new UnsupportedEndpointTypeException("This Admin API is not " + diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 229d3119871..378e08b2c45 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -478,6 +478,18 @@ public class KafkaAdminClientTest { callbackCalled.acquire(); } +@Test +public void testAdminClientFailureWhenClosed() { +MockTime time = new MockTime(); +AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, mockCluster(3, 0)); +env.adminClient().close(); +ExecutionException e = assertThrows(ExecutionException.class, () -> env.adminClient().createTopics( +singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2, +new CreateTopicsOptions().timeoutMs(1)).all().get()); +assertTrue(e.getCause() instanceof IllegalStateException, +"Expected an IllegalStateException error, but got " + Utils.stackTrace(e)); +} + private static OffsetDeleteResponse prepareOffsetDeleteResponse(Errors error) { return new OffsetDeleteResponse( new OffsetDeleteResponseData() diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 784374d23e8..5bb3533146c 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -1026,7 +1026,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { /** * Test closing the AdminClient with a generous timeout. Calls in progress should be completed, -* since they can be done within the timeout. New calls should receive timeouts. +* since they can be done within the timeout. New calls should receive exceptions. */ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) @@ -1037,7 +1037,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() client.close(time.
[kafka-site] branch asf-site updated: Added a blog entry for 3.6.0 release (#547)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new d3a5afeb Added a blog entry for 3.6.0 release (#547) d3a5afeb is described below commit d3a5afeb4ed3c958f8f77c6bbf34ac137533eb82 Author: Satish Duggana AuthorDate: Tue Oct 10 20:29:35 2023 -0700 Added a blog entry for 3.6.0 release (#547) Reviewers: David Arthur , Greg Harris, Christo Lolov , Gaurav Narula , Divij Vaidya , Mickael Maison , Justine Olshan --- blog.html | 138 ++ 1 file changed, 138 insertions(+) diff --git a/blog.html b/blog.html index c339be70..8774d7cf 100644 --- a/blog.html +++ b/blog.html @@ -22,6 +22,144 @@ Blog + + + +Apache Kafka 3.6.0 Release Announcement + +10 Oct 2023 - Satish Duggana (https://twitter.com/0xeed";>@SatishDuggana) +We are proud to announce the release of Apache Kafka 3.6.0. This release contains many new features and improvements. This blog post will highlight some of the more prominent features. For a full list of changes, be sure to check the https://downloads.apache.org/kafka/3.6.0/RELEASE_NOTES.html";>release notes. +See the https://kafka.apache.org/36/documentation.html#upgrade_3_6_0";>Upgrading to 3.6.0 from any version 0.8.x through 3.5.x section in the documentation for the list of notable changes and detailed upgrade steps. + +The ability to migrate Kafka clusters from a ZooKeeper metadata system to a KRaft metadata system is +now ready for usage in production environments. See the ZooKeeper to KRaft migration +https://kafka.apache.org/documentation/#kraft_zk_migration";>operations documentation for +details. Note that support for JBOD is still not available for KRaft clusters, therefore clusters +utilizing JBOD can not be migrated. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft";>KIP-858 +for details regarding KRaft and JBOD. + +Support for Delegation Tokens in KRaft (https://issues.apache.org/jira/browse/KAFKA-15219";>KAFKA-15219) was completed in 3.6, further reducing the gap of features between ZooKeeper-based Kafka clusters and KRaft. Migration of delegation tokens from ZooKeeper to KRaft is also included in 3.6. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage";>Tiered Storage is an early access feature. It is currently only suitable for testing in non production environments. See the https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+Early+Access+Release+Notes";>Early Access Release Notes for more details. + +Note: ZooKeeper is marked as deprecated since 3.5.0 release. ZooKeeper is planned to be removed in Apache Kafka 4.0. For more information, please see the documentation for ZooKeeper Deprecation +Kafka Broker, Controller, Producer, Consumer and Admin Client + +https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage";>KIP-405: +Kafka Tiered Storage (Early Access): Introduces Tiered Storage to Kafka. Note that this +is an early access feature only advised for use in non-production environments (see the https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+Early+Access+Release+Notes";>early +access notes for more information). This feature provides a separation of computation +and storage in the broker for pluggable storage tiering natively in Kafka Tiered Storage brings +a seamless extension of storage to remote objects with minimal operational changes. + +https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage";>KIP-890: +Transactions Server Side Defense (Part 1): Hanging transactions can negatively impact +your read committed consumers and prevent compacted logs from being compacted. KIP-890 helps +address hanging transactions by verifying partition additions. Part 2 of KIP-890 will optimize +verification, which currently adds an extra hop. +In 3.6.0, transaction verification will prevent hanging transactions on data partitions. In the next release, transactional offset commits will also be covered. +
[kafka-site] branch asf-site updated: MINOR Update default docs point to 3.6.0 release docs (#556)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 3381b69c MINOR Update default docs point to 3.6.0 release docs (#556) 3381b69c is described below commit 3381b69c04601f4cda6aedd2fe32462c1f8b19d2 Author: Satish Duggana AuthorDate: Tue Oct 10 20:06:49 2023 -0700 MINOR Update default docs point to 3.6.0 release docs (#556) Reviewers: Luke Chen , Divij Vaidya --- 35/documentation.html | 2 +- 35/streams/architecture.html | 2 +- 35/streams/core-concepts.html | 2 +- 35/streams/developer-guide/app-reset-tool.html | 2 +- 35/streams/developer-guide/config-streams.html | 2 +- 35/streams/developer-guide/datatypes.html | 2 +- 35/streams/developer-guide/dsl-api.html| 2 +- .../developer-guide/dsl-topology-naming.html | 2 +- 35/streams/developer-guide/index.html | 2 +- .../developer-guide/interactive-queries.html | 2 +- 35/streams/developer-guide/manage-topics.html | 2 +- 35/streams/developer-guide/memory-mgmt.html| 2 +- 35/streams/developer-guide/processor-api.html | 2 +- 35/streams/developer-guide/running-app.html| 2 +- 35/streams/developer-guide/security.html | 2 +- 35/streams/developer-guide/testing.html| 2 +- 35/streams/developer-guide/write-streams.html | 2 +- 35/streams/index.html | 2 +- 35/streams/quickstart.html | 2 +- 35/streams/tutorial.html | 2 +- 35/streams/upgrade-guide.html | 2 +- documentation.html | 2 +- documentation/index.html | 2 +- documentation/streams/architecture.html| 2 +- documentation/streams/core-concepts.html | 2 +- .../streams/developer-guide/app-reset-tool.html| 2 +- .../streams/developer-guide/config-streams.html| 2 +- .../streams/developer-guide/datatypes.html | 2 +- documentation/streams/developer-guide/dsl-api.html | 2 +- .../developer-guide/dsl-topology-naming.html | 2 +- documentation/streams/developer-guide/index.html | 2 +- .../developer-guide/interactive-queries.html | 2 +- .../streams/developer-guide/manage-topics.html | 2 +- .../streams/developer-guide/memory-mgmt.html | 2 +- .../streams/developer-guide/processor-api.html | 2 +- .../streams/developer-guide/running-app.html | 2 +- .../streams/developer-guide/security.html | 2 +- documentation/streams/developer-guide/testing.html | 2 +- .../streams/developer-guide/write-streams.html | 2 +- documentation/streams/index.html | 2 +- documentation/streams/quickstart.html | 2 +- documentation/streams/upgrade-guide.html | 2 +- downloads.html | 31 +- intro.html | 2 +- protocol.html | 2 +- quickstart.html| 2 +- uses.html | 2 +- 47 files changed, 76 insertions(+), 47 deletions(-) diff --git a/35/documentation.html b/35/documentation.html index 83b59b93..58cbd332 100644 --- a/35/documentation.html +++ b/35/documentation.html @@ -30,7 +30,7 @@ - + Documentation Kafka 3.5 Documentation diff --git a/35/streams/architecture.html b/35/streams/architecture.html index f0561296..139f5e3f 100644 --- a/35/streams/architecture.html +++ b/35/streams/architecture.html @@ -179,7 +179,7 @@ - + Documentation Kafka Streams diff --git a/35/streams/core-concepts.html b/35/streams/core-concepts.html index d9a2851e..07645ec1 100644 --- a/35/streams/core-concepts.html +++ b/35/streams/core-concepts.html @@ -351,7 +351,7 @@ - + Documentation Kafka Streams diff --git a/35/streams/developer-guide/app-reset-tool.html b/35/streams/developer-guide/app-reset-tool.html index a877b54f..46ba2e21 100644 --- a/35/streams/developer-guide/app-reset-tool.html +++ b/35/streams/developer-guide/app-reset-tool.html @@ -167,7 +167,7 @@ - + Documentation Kafka Streams diff --git a/35/streams/developer-guide/config-streams.html b/35/streams/developer-guide/config-streams.html index a38e8b3d..65a2a6b2 100644 --- a/35/streams/developer-guide/config-streams.html +++ b/35/streams/developer-guide/config-streams.html @@ -1127,7 +1127,7 @@ streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic
[kafka] branch trunk updated: KAFKA-15169: Added TestCase in RemoteIndexCache (#14482)
This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 99ce2e081c5 KAFKA-15169: Added TestCase in RemoteIndexCache (#14482) 99ce2e081c5 is described below commit 99ce2e081c575eeb6f02a046e3b6530ca6402ad2 Author: Arpit Goyal <59436466+iit2009...@users.noreply.github.com> AuthorDate: Wed Oct 11 08:28:17 2023 +0530 KAFKA-15169: Added TestCase in RemoteIndexCache (#14482) est Cases Covered 1. Index Files already exist on disk but not in Cache i.e. RemoteIndexCache should not call remoteStorageManager to fetch it instead cache it from the local index file present. 2. RSM returns CorruptedIndex File i.e. RemoteIndexCache should throw CorruptedIndexException instead of successfull execution. 3. Deleted Suffix Indexes file already present on disk i.e. If cleaner thread is slow , then there is a chance of deleted index files present on the disk while in parallel same index Entry is invalidated. To understand more refer https://issues.apache.org/jira/browse/KAFKA-15169 Reviewers: Divij Vaidya , Luke Chen , Kamal Chandraprakash --- .../kafka/log/remote/RemoteIndexCacheTest.scala| 275 +++-- .../storage/internals/log/RemoteIndexCache.java| 4 +- 2 files changed, 260 insertions(+), 19 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala index b9482217062..07f07757b31 100644 --- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala @@ -17,25 +17,28 @@ package kafka.log.remote import kafka.utils.TestUtils +import kafka.utils.TestUtils.waitUntilTrue import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager} import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName} -import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex} +import org.apache.kafka.storage.internals.log.{AbortedTxn, CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex} import org.apache.kafka.test.{TestUtils => JTestUtils} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ import org.slf4j.{Logger, LoggerFactory} -import java.io.{File, FileInputStream, IOException, PrintWriter} -import java.nio.file.Files +import java.io.{File, FileInputStream, FileNotFoundException, IOException, PrintWriter} +import java.nio.file.{Files, Paths} import java.util -import java.util.Collections +import java.util.{Collections, Optional} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import scala.collection.mutable @@ -516,10 +519,14 @@ class RemoteIndexCacheTest { def testClearCacheAndIndexFilesWhenResizeCache(): Unit = { def getIndexFileFromRemoteCacheDir(suffix: String) = { - Files.walk(cache.cacheDir()) -.filter(Files.isRegularFile(_)) -.filter(path => path.getFileName.toString.endsWith(suffix)) -.findAny() + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: FileNotFoundException => Optional.empty() + } } val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) @@ -554,23 +561,223 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } - @Test - def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = { -// create Corrupt Offset Index File -createCorruptRemoteIndexCacheOffsetFile() + @ParameterizedTest + @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION")) + def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit = { +// create Corrupted Index File in r
[kafka] branch trunk updated: KAFKA-14927: Add validation to be config keys in ConfigCommand tool (#14514)
This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 6e164bb9ace KAFKA-14927: Add validation to be config keys in ConfigCommand tool (#14514) 6e164bb9ace is described below commit 6e164bb9ace3ea7a1a9542904d1a01c9fd3a1b48 Author: Aman Singh <103091061+singhn...@users.noreply.github.com> AuthorDate: Tue Oct 10 13:19:13 2023 +0530 KAFKA-14927: Add validation to be config keys in ConfigCommand tool (#14514) Added validation in ConfigCommand tool, only allow characters '([a-z][A-Z][0-9][._-])*' for config keys. Reviewers: Manikumar Reddy --- core/src/main/scala/kafka/admin/ConfigCommand.scala | 13 +++-- .../scala/unit/kafka/admin/ConfigCommandTest.scala | 20 ++-- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 309d95aaab8..25d400918f6 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -297,20 +297,29 @@ object ConfigCommand extends Logging { "This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker or " + "if the inter.broker.protocol.version is 3.0 or newer. This configuration is deprecated and it will be removed in Apache Kafka 4.0.") } +validatePropsKey(props) props } private[admin] def parseConfigsToBeDeleted(opts: ConfigCommandOptions): Seq[String] = { if (opts.options.has(opts.deleteConfig)) { val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfig).asScala.map(_.trim()) - val propsToBeDeleted = new Properties - configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, "")) configsToBeDeleted } else Seq.empty } + private def validatePropsKey(props: Properties): Unit = { +props.keySet.forEach { propsKey => + if (!propsKey.toString.matches("[a-zA-Z0-9._-]*")) { +throw new IllegalArgumentException( + s"Invalid character found for config key: ${propsKey}" +) + } +} + } + private def processCommand(opts: ConfigCommandOptions): Unit = { val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index 124227dd1f5..12e460a0bd3 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -304,15 +304,31 @@ class ConfigCommandTest extends Logging { createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2, shortFlag, "1", "--alter", - "--add-config", "a=b,c=,d=e,f=")) + "--add-config", "a._-c=b,c=,d=e,f=")) createOpts.checkArgs() val addedProps2 = ConfigCommand.parseConfigsToBeAdded(createOpts) assertEquals(4, addedProps2.size()) -assertEquals("b", addedProps2.getProperty("a")) +assertEquals("b", addedProps2.getProperty("a._-c")) assertEquals("e", addedProps2.getProperty("d")) assertTrue(addedProps2.getProperty("c").isEmpty) assertTrue(addedProps2.getProperty("f").isEmpty) + +var inValidCreateOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2, + shortFlag, "1", + "--alter", + "--add-config", "a;c=b")) + +assertThrows(classOf[IllegalArgumentException], + () => ConfigCommand.parseConfigsToBeAdded(inValidCreateOpts)) + +inValidCreateOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2, + shortFlag, "1", + "--alter", + "--add-config", "a,=b")) + +assertThrows(classOf[IllegalArgumentException], + () => ConfigCommand.parseConfigsToBeAdded(inValidCreateOpts)) } @Test