[kafka] branch trunk updated: KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing (#14455)

2023-10-10 Thread showuon
This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 3c9031c6245 KAFKA-15507: Make AdminClient throw non-retriable 
exception for a new call while closing (#14455)
3c9031c6245 is described below

commit 3c9031c62455e4eaa3f5d16a3bba94d7e3159fb6
Author: Gantigmaa Selenge <39860586+tinasele...@users.noreply.github.com>
AuthorDate: Wed Oct 11 04:41:46 2023 +0100

KAFKA-15507: Make AdminClient throw non-retriable exception for a new call 
while closing (#14455)

AdminClient will throw IllegalStateException instead of TimeoutException if 
it receives new calls while closing down. This is more consistent with how 
Consumer and Producer clients handle new calls after closed down.

Reviewers: Luke Chen , Kirk True , 
Kamal Chandraprakash , vamossagar12 

---
 .../org/apache/kafka/clients/admin/KafkaAdminClient.java |  5 ++---
 .../org/apache/kafka/clients/admin/KafkaAdminClientTest.java | 12 
 .../kafka/api/PlaintextAdminIntegrationTest.scala|  4 ++--
 3 files changed, 16 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 4db6b271946..27d28b5b336 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1546,9 +1546,8 @@ public class KafkaAdminClient extends AdminClient {
  */
 void call(Call call, long now) {
 if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
-log.debug("The AdminClient is not accepting new calls. Timing 
out {}.", call);
-call.handleTimeoutFailure(time.milliseconds(),
-new TimeoutException("The AdminClient thread is not 
accepting new calls."));
+log.debug("Cannot accept new call {} when AdminClient is 
closing.", call);
+call.handleFailure(new IllegalStateException("Cannot accept 
new calls when AdminClient is closing."));
 } else if (metadataManager.usingBootstrapControllers() &&
 (!call.nodeProvider.supportsUseControllers())) {
 call.fail(now, new UnsupportedEndpointTypeException("This 
Admin API is not " +
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 229d3119871..378e08b2c45 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -478,6 +478,18 @@ public class KafkaAdminClientTest {
 callbackCalled.acquire();
 }
 
+@Test
+public void testAdminClientFailureWhenClosed() {
+MockTime time = new MockTime();
+AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
mockCluster(3, 0));
+env.adminClient().close();
+ExecutionException e = assertThrows(ExecutionException.class, () -> 
env.adminClient().createTopics(
+singleton(new NewTopic("myTopic", Collections.singletonMap(0, 
asList(0, 1, 2,
+new CreateTopicsOptions().timeoutMs(1)).all().get());
+assertTrue(e.getCause() instanceof IllegalStateException,
+"Expected an IllegalStateException error, but got " + 
Utils.stackTrace(e));
+}
+
 private static OffsetDeleteResponse prepareOffsetDeleteResponse(Errors 
error) {
 return new OffsetDeleteResponse(
 new OffsetDeleteResponseData()
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 784374d23e8..5bb3533146c 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1026,7 +1026,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
   /**
 * Test closing the AdminClient with a generous timeout.  Calls in progress 
should be completed,
-* since they can be done within the timeout.  New calls should receive 
timeouts.
+* since they can be done within the timeout.  New calls should receive 
exceptions.
 */
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
@@ -1037,7 +1037,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 val future = client.createTopics(newTopics.asJava, new 
CreateTopicsOptions().validateOnly(true)).all()
 client.close(time.

[kafka-site] branch asf-site updated: Added a blog entry for 3.6.0 release (#547)

2023-10-10 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new d3a5afeb Added a blog entry for 3.6.0 release (#547)
d3a5afeb is described below

commit d3a5afeb4ed3c958f8f77c6bbf34ac137533eb82
Author: Satish Duggana 
AuthorDate: Tue Oct 10 20:29:35 2023 -0700

Added a blog entry for 3.6.0 release (#547)

Reviewers: David Arthur , Greg 
Harris, Christo Lolov , Gaurav Narula 
, Divij Vaidya , Mickael Maison 
, Justine Olshan 
---
 blog.html | 138 ++
 1 file changed, 138 insertions(+)

diff --git a/blog.html b/blog.html
index c339be70..8774d7cf 100644
--- a/blog.html
+++ b/blog.html
@@ -22,6 +22,144 @@
 
 
 Blog
+
+
+
+Apache 
Kafka 3.6.0 Release Announcement
+
+10 Oct 2023 - Satish Duggana (https://twitter.com/0xeed";>@SatishDuggana)
+We are proud to announce the release of Apache Kafka 3.6.0. 
This release contains many new features and improvements. This blog post will 
highlight some of the more prominent features. For a full list of changes, be 
sure to check the https://downloads.apache.org/kafka/3.6.0/RELEASE_NOTES.html";>release 
notes.
+See the https://kafka.apache.org/36/documentation.html#upgrade_3_6_0";>Upgrading 
to 3.6.0 from any version 0.8.x through 3.5.x section in the documentation 
for the list of notable changes and detailed upgrade steps.
+
+The ability to migrate Kafka clusters from a ZooKeeper 
metadata system to a KRaft metadata system is
+now ready for usage in production environments. See the 
ZooKeeper to KRaft migration
+https://kafka.apache.org/documentation/#kraft_zk_migration";>operations 
documentation for
+details. Note that support for JBOD is still not available 
for KRaft clusters, therefore clusters
+utilizing JBOD can not be migrated. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft";>KIP-858
+for details regarding KRaft and JBOD.
+
+Support for Delegation Tokens in KRaft (https://issues.apache.org/jira/browse/KAFKA-15219";>KAFKA-15219) was 
completed in 3.6, further reducing the gap of features between ZooKeeper-based 
Kafka clusters and KRaft. Migration of delegation tokens from ZooKeeper to 
KRaft is also included in 3.6.
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage";>Tiered
 Storage is an early access feature. It is currently only suitable for 
testing in non production environments. See the https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+Early+Access+Release+Notes";>Early
 Access Release Notes for more details.
+
+Note: ZooKeeper is marked as deprecated since 3.5.0 
release. ZooKeeper is planned to be removed in Apache Kafka 4.0. For more 
information, please see the documentation for ZooKeeper Deprecation
+Kafka Broker, Controller, Producer, Consumer and Admin 
Client
+
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage";>KIP-405:
+Kafka Tiered Storage (Early Access): 
Introduces Tiered Storage to Kafka. Note that this
+is an early access feature only advised for use in 
non-production environments (see the https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+Early+Access+Release+Notes";>early
+access notes for more information). This 
feature provides a separation of computation
+and storage in the broker for pluggable storage 
tiering natively in Kafka Tiered Storage brings
+a seamless extension of storage to remote objects with 
minimal operational changes.
+
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage";>KIP-890:
+Transactions Server Side Defense (Part 1): 
Hanging transactions can negatively impact
+your read committed consumers and prevent compacted 
logs from being compacted. KIP-890 helps
+address hanging transactions by verifying partition 
additions. Part 2 of KIP-890 will optimize
+verification, which currently adds an extra hop.
+In 3.6.0, transaction verification will prevent 
hanging transactions on data partitions. In the next release, transactional 
offset commits will also be covered.
+   

[kafka-site] branch asf-site updated: MINOR Update default docs point to 3.6.0 release docs (#556)

2023-10-10 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 3381b69c MINOR Update default docs point to 3.6.0 release docs (#556)
3381b69c is described below

commit 3381b69c04601f4cda6aedd2fe32462c1f8b19d2
Author: Satish Duggana 
AuthorDate: Tue Oct 10 20:06:49 2023 -0700

MINOR Update default docs point to 3.6.0 release docs (#556)

Reviewers: Luke Chen , Divij Vaidya 
---
 35/documentation.html  |  2 +-
 35/streams/architecture.html   |  2 +-
 35/streams/core-concepts.html  |  2 +-
 35/streams/developer-guide/app-reset-tool.html |  2 +-
 35/streams/developer-guide/config-streams.html |  2 +-
 35/streams/developer-guide/datatypes.html  |  2 +-
 35/streams/developer-guide/dsl-api.html|  2 +-
 .../developer-guide/dsl-topology-naming.html   |  2 +-
 35/streams/developer-guide/index.html  |  2 +-
 .../developer-guide/interactive-queries.html   |  2 +-
 35/streams/developer-guide/manage-topics.html  |  2 +-
 35/streams/developer-guide/memory-mgmt.html|  2 +-
 35/streams/developer-guide/processor-api.html  |  2 +-
 35/streams/developer-guide/running-app.html|  2 +-
 35/streams/developer-guide/security.html   |  2 +-
 35/streams/developer-guide/testing.html|  2 +-
 35/streams/developer-guide/write-streams.html  |  2 +-
 35/streams/index.html  |  2 +-
 35/streams/quickstart.html |  2 +-
 35/streams/tutorial.html   |  2 +-
 35/streams/upgrade-guide.html  |  2 +-
 documentation.html |  2 +-
 documentation/index.html   |  2 +-
 documentation/streams/architecture.html|  2 +-
 documentation/streams/core-concepts.html   |  2 +-
 .../streams/developer-guide/app-reset-tool.html|  2 +-
 .../streams/developer-guide/config-streams.html|  2 +-
 .../streams/developer-guide/datatypes.html |  2 +-
 documentation/streams/developer-guide/dsl-api.html |  2 +-
 .../developer-guide/dsl-topology-naming.html   |  2 +-
 documentation/streams/developer-guide/index.html   |  2 +-
 .../developer-guide/interactive-queries.html   |  2 +-
 .../streams/developer-guide/manage-topics.html |  2 +-
 .../streams/developer-guide/memory-mgmt.html   |  2 +-
 .../streams/developer-guide/processor-api.html |  2 +-
 .../streams/developer-guide/running-app.html   |  2 +-
 .../streams/developer-guide/security.html  |  2 +-
 documentation/streams/developer-guide/testing.html |  2 +-
 .../streams/developer-guide/write-streams.html |  2 +-
 documentation/streams/index.html   |  2 +-
 documentation/streams/quickstart.html  |  2 +-
 documentation/streams/upgrade-guide.html   |  2 +-
 downloads.html | 31 +-
 intro.html |  2 +-
 protocol.html  |  2 +-
 quickstart.html|  2 +-
 uses.html  |  2 +-
 47 files changed, 76 insertions(+), 47 deletions(-)

diff --git a/35/documentation.html b/35/documentation.html
index 83b59b93..58cbd332 100644
--- a/35/documentation.html
+++ b/35/documentation.html
@@ -30,7 +30,7 @@
 
   
   
-
+
 
 Documentation
 Kafka 3.5 Documentation
diff --git a/35/streams/architecture.html b/35/streams/architecture.html
index f0561296..139f5e3f 100644
--- a/35/streams/architecture.html
+++ b/35/streams/architecture.html
@@ -179,7 +179,7 @@
 
 
 
-
+
 
 Documentation
 Kafka Streams
diff --git a/35/streams/core-concepts.html b/35/streams/core-concepts.html
index d9a2851e..07645ec1 100644
--- a/35/streams/core-concepts.html
+++ b/35/streams/core-concepts.html
@@ -351,7 +351,7 @@
 
 
 
-
+
 
 Documentation
 Kafka Streams
diff --git a/35/streams/developer-guide/app-reset-tool.html 
b/35/streams/developer-guide/app-reset-tool.html
index a877b54f..46ba2e21 100644
--- a/35/streams/developer-guide/app-reset-tool.html
+++ b/35/streams/developer-guide/app-reset-tool.html
@@ -167,7 +167,7 @@
 
   
   
-
+
 
   Documentation
   Kafka Streams
diff --git a/35/streams/developer-guide/config-streams.html 
b/35/streams/developer-guide/config-streams.html
index a38e8b3d..65a2a6b2 100644
--- a/35/streams/developer-guide/config-streams.html
+++ b/35/streams/developer-guide/config-streams.html
@@ -1127,7 +1127,7 @@ 
streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic

[kafka] branch trunk updated: KAFKA-15169: Added TestCase in RemoteIndexCache (#14482)

2023-10-10 Thread showuon
This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 99ce2e081c5 KAFKA-15169: Added TestCase in RemoteIndexCache (#14482)
99ce2e081c5 is described below

commit 99ce2e081c575eeb6f02a046e3b6530ca6402ad2
Author: Arpit Goyal <59436466+iit2009...@users.noreply.github.com>
AuthorDate: Wed Oct 11 08:28:17 2023 +0530

KAFKA-15169: Added TestCase in RemoteIndexCache (#14482)

est Cases Covered

1. Index Files already exist on disk but not in Cache i.e. 
RemoteIndexCache should not call remoteStorageManager to fetch it instead cache 
it from the local index file present.
2. RSM returns CorruptedIndex File i.e. RemoteIndexCache should throw 
CorruptedIndexException instead of successfull execution.
3. Deleted Suffix Indexes file already present on disk i.e. If cleaner 
thread is slow , then there is a chance of deleted index files present on the 
disk while in parallel same index Entry is invalidated. To understand more 
refer https://issues.apache.org/jira/browse/KAFKA-15169

Reviewers: Divij Vaidya , Luke Chen , 
Kamal Chandraprakash
---
 .../kafka/log/remote/RemoteIndexCacheTest.scala| 275 +++--
 .../storage/internals/log/RemoteIndexCache.java|   4 +-
 2 files changed, 260 insertions(+), 19 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala 
b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index b9482217062..07f07757b31 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -17,25 +17,28 @@
 package kafka.log.remote
 
 import kafka.utils.TestUtils
+import kafka.utils.TestUtils.waitUntilTrue
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
 import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, 
RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
 import org.apache.kafka.server.util.MockTime
 import 
org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD,
 remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, 
remoteTimeIndexFileName, remoteTransactionIndexFile, 
remoteTransactionIndexFileName}
-import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, 
OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, 
CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, 
RemoteIndexCache, TimeIndex, TransactionIndex}
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito._
 import org.slf4j.{Logger, LoggerFactory}
 
-import java.io.{File, FileInputStream, IOException, PrintWriter}
-import java.nio.file.Files
+import java.io.{File, FileInputStream, FileNotFoundException, IOException, 
PrintWriter}
+import java.nio.file.{Files, Paths}
 import java.util
-import java.util.Collections
+import java.util.{Collections, Optional}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import scala.collection.mutable
 
@@ -516,10 +519,14 @@ class RemoteIndexCacheTest {
   def testClearCacheAndIndexFilesWhenResizeCache(): Unit = {
 
 def getIndexFileFromRemoteCacheDir(suffix: String) = {
-  Files.walk(cache.cacheDir())
-.filter(Files.isRegularFile(_))
-.filter(path => path.getFileName.toString.endsWith(suffix))
-.findAny()
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: FileNotFoundException => Optional.empty()
+  }
 }
 
 val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
@@ -554,23 +561,223 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
-  @Test
-  def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
-// create Corrupt Offset Index File
-createCorruptRemoteIndexCacheOffsetFile()
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", 
"TRANSACTION"))
+  def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit 
= {
+// create Corrupted Index File in r

[kafka] branch trunk updated: KAFKA-14927: Add validation to be config keys in ConfigCommand tool (#14514)

2023-10-10 Thread manikumar
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 6e164bb9ace KAFKA-14927: Add validation to be config keys in 
ConfigCommand tool (#14514)
6e164bb9ace is described below

commit 6e164bb9ace3ea7a1a9542904d1a01c9fd3a1b48
Author: Aman Singh <103091061+singhn...@users.noreply.github.com>
AuthorDate: Tue Oct 10 13:19:13 2023 +0530

KAFKA-14927: Add validation to be config keys in ConfigCommand tool (#14514)

Added validation in ConfigCommand tool, only allow characters
'([a-z][A-Z][0-9][._-])*' for config keys.

Reviewers: Manikumar Reddy 
---
 core/src/main/scala/kafka/admin/ConfigCommand.scala  | 13 +++--
 .../scala/unit/kafka/admin/ConfigCommandTest.scala   | 20 ++--
 2 files changed, 29 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 309d95aaab8..25d400918f6 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -297,20 +297,29 @@ object ConfigCommand extends Logging {
 "This configuration will be ignored if the version is newer than the 
inter.broker.protocol.version specified in the broker or " +
 "if the inter.broker.protocol.version is 3.0 or newer. This 
configuration is deprecated and it will be removed in Apache Kafka 4.0.")
 }
+validatePropsKey(props)
 props
   }
 
   private[admin] def parseConfigsToBeDeleted(opts: ConfigCommandOptions): 
Seq[String] = {
 if (opts.options.has(opts.deleteConfig)) {
   val configsToBeDeleted = 
opts.options.valuesOf(opts.deleteConfig).asScala.map(_.trim())
-  val propsToBeDeleted = new Properties
-  configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
   configsToBeDeleted
 }
 else
   Seq.empty
   }
 
+  private def validatePropsKey(props: Properties): Unit = {
+props.keySet.forEach { propsKey =>
+  if (!propsKey.toString.matches("[a-zA-Z0-9._-]*")) {
+throw new IllegalArgumentException(
+  s"Invalid character found for config key: ${propsKey}"
+)
+  }
+}
+  }
+
   private def processCommand(opts: ConfigCommandOptions): Unit = {
 val props = if (opts.options.has(opts.commandConfigOpt))
   Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 124227dd1f5..12e460a0bd3 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -304,15 +304,31 @@ class ConfigCommandTest extends Logging {
 createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
   shortFlag, "1",
   "--alter",
-  "--add-config", "a=b,c=,d=e,f="))
+  "--add-config", "a._-c=b,c=,d=e,f="))
 createOpts.checkArgs()
 
 val addedProps2 = ConfigCommand.parseConfigsToBeAdded(createOpts)
 assertEquals(4, addedProps2.size())
-assertEquals("b", addedProps2.getProperty("a"))
+assertEquals("b", addedProps2.getProperty("a._-c"))
 assertEquals("e", addedProps2.getProperty("d"))
 assertTrue(addedProps2.getProperty("c").isEmpty)
 assertTrue(addedProps2.getProperty("f").isEmpty)
+
+var inValidCreateOpts = new ConfigCommandOptions(Array(connectOpts._1, 
connectOpts._2,
+  shortFlag, "1",
+  "--alter",
+  "--add-config", "a;c=b"))
+
+assertThrows(classOf[IllegalArgumentException],
+  () => ConfigCommand.parseConfigsToBeAdded(inValidCreateOpts))
+
+inValidCreateOpts = new ConfigCommandOptions(Array(connectOpts._1, 
connectOpts._2,
+  shortFlag, "1",
+  "--alter",
+  "--add-config", "a,=b"))
+
+assertThrows(classOf[IllegalArgumentException],
+  () => ConfigCommand.parseConfigsToBeAdded(inValidCreateOpts))
   }
 
   @Test