kafka git commit: KAFKA-3502; move RocksDB options construction to init()

2017-01-17 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 eb62e5695 -> e38794e02


KAFKA-3502; move RocksDB options construction to init()

In RocksDBStore, options / wOptions / fOptions are constructed in the 
constructor, which needs to be dismissed in the close() call; however in some 
tests, the generated topology is not initialized at all, and hence the 
corresponding state stores are supposed to not be able to be closed as well 
since their `init` function is not called. This could cause the above option 
objects to be not released.

This is fixed in this patch to move the logic out of constructor and inside 
`init` functions, so that no RocksDB objects will be created in the constructor 
only. Also some minor cleanups:

1. In KStreamTestDriver.close(), we lost the logic to close the state stores 
but only call `flush`; it is now changed back to call both.
2. Moved the forwarding logic from KStreamTestDriver to MockProcessorContext to 
remove the mutual dependency: these functions should really be in 
ProcessorContext, not the test driver.

Author: Guozhang Wang 

Reviewers: Damian Guy , Matthias J. Sax 
, Jason Gustafson 

Closes #2381 from guozhangwang/K3502-pure-virtual-function-unit-tests

(cherry picked from commit 1974e1b0e54abe5fdebd8ff3338df864b7ab60f3)
Signed-off-by: Jason Gustafson 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e38794e0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e38794e0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e38794e0

Branch: refs/heads/0.10.2
Commit: e38794e020951adec5a5d0bbfe42c57294bf67bd
Parents: eb62e56
Author: Guozhang Wang 
Authored: Tue Jan 17 20:29:55 2017 -0800
Committer: Jason Gustafson 
Committed: Tue Jan 17 20:37:00 2017 -0800

--
 .../streams/state/internals/RocksDBStore.java   |  21 +---
 .../streams/kstream/KStreamBuilderTest.java |  58 +-
 .../internals/KStreamKTableLeftJoinTest.java|   2 -
 ...reamSessionWindowAggregateProcessorTest.java |   4 +-
 .../streams/state/KeyValueStoreTestDriver.java  |   2 +-
 .../internals/CachingKeyValueStoreTest.java |   2 +-
 .../internals/CachingSessionStoreTest.java  |   2 +-
 .../state/internals/CachingWindowStoreTest.java |   2 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java|   3 +-
 .../ChangeLoggingKeyValueStoreTest.java |   3 +-
 .../ChangeLoggingSegmentedBytesStoreTest.java   |   3 +-
 .../MeteredSegmentedBytesStoreTest.java |   3 +-
 .../RocksDBKeyValueStoreSupplierTest.java   |   9 +-
 .../RocksDBSegmentedBytesStoreTest.java |   3 +-
 .../RocksDBSessionStoreSupplierTest.java|   9 +-
 .../internals/RocksDBSessionStoreTest.java  |   3 +-
 .../RocksDBWindowStoreSupplierTest.java |   9 +-
 .../state/internals/RocksDBWindowStoreTest.java |  24 ++--
 .../state/internals/SegmentIteratorTest.java|  27 +++--
 .../streams/state/internals/SegmentsTest.java   |   3 +-
 .../apache/kafka/test/KStreamTestDriver.java| 113 +--
 .../apache/kafka/test/MockProcessorContext.java |  57 +++---
 22 files changed, 153 insertions(+), 209 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
--
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 3f8d509..55c1bb8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -94,25 +94,21 @@ public class RocksDBStore implements KeyValueStore {
 
 protected volatile boolean open = false;
 
-
-public RocksDBStore(final String name,
-final Serde keySerde,
-final Serde valueSerde) {
+RocksDBStore(String name, Serde keySerde, Serde valueSerde) {
 this(name, DB_FILE_DIR, keySerde, valueSerde);
 }
 
-
-public RocksDBStore(final String name,
-final String parentDir,
-final Serde keySerde,
-final Serde valueSerde) {
+RocksDBStore(String name, String parentDir, Serde keySerde, Serde 
valueSerde) {
 this.name = name;
 this.parentDir = parentDir;
 this.keySerde = keySerde;
 this.valueSerde = valueSerde;
+}
 
+@SuppressWarnings("unchecked")
+public void openDB(ProcessorContext context) {
 

kafka git commit: KAFKA-3502; move RocksDB options construction to init()

2017-01-17 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/trunk fd6d7bcf3 -> 1974e1b0e


KAFKA-3502; move RocksDB options construction to init()

In RocksDBStore, options / wOptions / fOptions are constructed in the 
constructor, which needs to be dismissed in the close() call; however in some 
tests, the generated topology is not initialized at all, and hence the 
corresponding state stores are supposed to not be able to be closed as well 
since their `init` function is not called. This could cause the above option 
objects to be not released.

This is fixed in this patch to move the logic out of constructor and inside 
`init` functions, so that no RocksDB objects will be created in the constructor 
only. Also some minor cleanups:

1. In KStreamTestDriver.close(), we lost the logic to close the state stores 
but only call `flush`; it is now changed back to call both.
2. Moved the forwarding logic from KStreamTestDriver to MockProcessorContext to 
remove the mutual dependency: these functions should really be in 
ProcessorContext, not the test driver.

Author: Guozhang Wang 

Reviewers: Damian Guy , Matthias J. Sax 
, Jason Gustafson 

Closes #2381 from guozhangwang/K3502-pure-virtual-function-unit-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1974e1b0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1974e1b0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1974e1b0

Branch: refs/heads/trunk
Commit: 1974e1b0e54abe5fdebd8ff3338df864b7ab60f3
Parents: fd6d7bc
Author: Guozhang Wang 
Authored: Tue Jan 17 20:29:55 2017 -0800
Committer: Jason Gustafson 
Committed: Tue Jan 17 20:31:31 2017 -0800

--
 .../streams/state/internals/RocksDBStore.java   |  21 +---
 .../streams/kstream/KStreamBuilderTest.java |  58 +-
 .../internals/KStreamKTableLeftJoinTest.java|   2 -
 ...reamSessionWindowAggregateProcessorTest.java |   4 +-
 .../streams/state/KeyValueStoreTestDriver.java  |   2 +-
 .../internals/CachingKeyValueStoreTest.java |   2 +-
 .../internals/CachingSessionStoreTest.java  |   2 +-
 .../state/internals/CachingWindowStoreTest.java |   2 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java|   3 +-
 .../ChangeLoggingKeyValueStoreTest.java |   3 +-
 .../ChangeLoggingSegmentedBytesStoreTest.java   |   3 +-
 .../MeteredSegmentedBytesStoreTest.java |   3 +-
 .../RocksDBKeyValueStoreSupplierTest.java   |   9 +-
 .../RocksDBSegmentedBytesStoreTest.java |   3 +-
 .../RocksDBSessionStoreSupplierTest.java|   9 +-
 .../internals/RocksDBSessionStoreTest.java  |   3 +-
 .../RocksDBWindowStoreSupplierTest.java |   9 +-
 .../state/internals/RocksDBWindowStoreTest.java |  24 ++--
 .../state/internals/SegmentIteratorTest.java|  27 +++--
 .../streams/state/internals/SegmentsTest.java   |   3 +-
 .../apache/kafka/test/KStreamTestDriver.java| 113 +--
 .../apache/kafka/test/MockProcessorContext.java |  57 +++---
 22 files changed, 153 insertions(+), 209 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
--
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 3f8d509..55c1bb8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -94,25 +94,21 @@ public class RocksDBStore implements KeyValueStore {
 
 protected volatile boolean open = false;
 
-
-public RocksDBStore(final String name,
-final Serde keySerde,
-final Serde valueSerde) {
+RocksDBStore(String name, Serde keySerde, Serde valueSerde) {
 this(name, DB_FILE_DIR, keySerde, valueSerde);
 }
 
-
-public RocksDBStore(final String name,
-final String parentDir,
-final Serde keySerde,
-final Serde valueSerde) {
+RocksDBStore(String name, String parentDir, Serde keySerde, Serde 
valueSerde) {
 this.name = name;
 this.parentDir = parentDir;
 this.keySerde = keySerde;
 this.valueSerde = valueSerde;
+}
 
+@SuppressWarnings("unchecked")
+public void openDB(ProcessorContext context) {
 // initialize the default rocksdb options
-BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+

kafka git commit: KAFKA-4591; Create Topic Policy follow-up

2017-01-17 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 6f72a5a53 -> eb62e5695


KAFKA-4591; Create Topic Policy follow-up

1. Added javadoc to public classes
2. Removed `s` from config name for consistency with interface name
3. The policy interface now implements Configurable and AutoCloseable as per 
the KIP
4. Use `null` instead of `-1` in `RequestMetadata`
5. Perform all broker validation before invoking the policy
6. Add tests

Author: Ismael Juma 

Reviewers: Jason Gustafson 

Closes #2388 from ijuma/create-topic-policy-docs-and-config-name-change

(cherry picked from commit fd6d7bcf335166a524dc9a29a50c96af8f1c1c02)
Signed-off-by: Ismael Juma 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eb62e569
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eb62e569
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eb62e569

Branch: refs/heads/0.10.2
Commit: eb62e5695506ae13bd37102c3c08e8a067eca0c8
Parents: 6f72a5a
Author: Ismael Juma 
Authored: Wed Jan 18 02:43:10 2017 +
Committer: Ismael Juma 
Committed: Wed Jan 18 02:43:37 2017 +

--
 .../common/errors/PolicyViolationException.java |  3 +
 .../apache/kafka/common/protocol/Errors.java|  2 +-
 .../kafka/server/policy/CreateTopicPolicy.java  | 72 ++--
 .../src/main/scala/kafka/admin/AdminUtils.scala | 36 ++
 .../main/scala/kafka/server/AdminManager.scala  | 56 ++-
 .../main/scala/kafka/server/KafkaConfig.scala   |  6 +-
 .../AbstractCreateTopicsRequestTest.scala   |  2 +-
 .../CreateTopicsRequestWithPolicyTest.scala | 59 +---
 .../unit/kafka/server/KafkaConfigTest.scala |  2 +-
 9 files changed, 183 insertions(+), 55 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/eb62e569/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
index 7923444..393a6df 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.common.errors;
 
+/**
+ * Exception thrown if a create topics request does not satisfy the configured 
policy for a topic.
+ */
 public class PolicyViolationException extends ApiException {
 
 public PolicyViolationException(String message) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb62e569/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
--
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index f30f889..e7689e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -166,7 +166,7 @@ public enum Errors {
 " the message was sent to an incompatible broker. See the broker 
logs for more details.")),
 UNSUPPORTED_FOR_MESSAGE_FORMAT(43,
 new UnsupportedForMessageFormatException("The message format version 
on the broker does not support the request.")),
-POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do 
not satisfy the system policy."));
+POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do 
not satisfy the configured policy."));
 
 private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb62e569/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java 
b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
index 94f1e76..22a7c1d 100644
--- 
a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
+++ 
b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
@@ -13,42 +13,90 @@
 
 package org.apache.kafka.server.policy;
 
+import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.errors.PolicyViolationException;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-public interface CreateTopicPolicy {
+/**
+ * An interface for enforcing a policy on create topics requests.
+ *
+ * 

kafka git commit: KAFKA-4591; Create Topic Policy follow-up

2017-01-17 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/trunk e3bdc84d8 -> fd6d7bcf3


KAFKA-4591; Create Topic Policy follow-up

1. Added javadoc to public classes
2. Removed `s` from config name for consistency with interface name
3. The policy interface now implements Configurable and AutoCloseable as per 
the KIP
4. Use `null` instead of `-1` in `RequestMetadata`
5. Perform all broker validation before invoking the policy
6. Add tests

Author: Ismael Juma 

Reviewers: Jason Gustafson 

Closes #2388 from ijuma/create-topic-policy-docs-and-config-name-change


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fd6d7bcf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fd6d7bcf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fd6d7bcf

Branch: refs/heads/trunk
Commit: fd6d7bcf335166a524dc9a29a50c96af8f1c1c02
Parents: e3bdc84
Author: Ismael Juma 
Authored: Wed Jan 18 02:43:10 2017 +
Committer: Ismael Juma 
Committed: Wed Jan 18 02:43:10 2017 +

--
 .../common/errors/PolicyViolationException.java |  3 +
 .../apache/kafka/common/protocol/Errors.java|  2 +-
 .../kafka/server/policy/CreateTopicPolicy.java  | 72 ++--
 .../src/main/scala/kafka/admin/AdminUtils.scala | 36 ++
 .../main/scala/kafka/server/AdminManager.scala  | 56 ++-
 .../main/scala/kafka/server/KafkaConfig.scala   |  6 +-
 .../AbstractCreateTopicsRequestTest.scala   |  2 +-
 .../CreateTopicsRequestWithPolicyTest.scala | 59 +---
 .../unit/kafka/server/KafkaConfigTest.scala |  2 +-
 9 files changed, 183 insertions(+), 55 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
index 7923444..393a6df 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.common.errors;
 
+/**
+ * Exception thrown if a create topics request does not satisfy the configured 
policy for a topic.
+ */
 public class PolicyViolationException extends ApiException {
 
 public PolicyViolationException(String message) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
--
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index f30f889..e7689e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -166,7 +166,7 @@ public enum Errors {
 " the message was sent to an incompatible broker. See the broker 
logs for more details.")),
 UNSUPPORTED_FOR_MESSAGE_FORMAT(43,
 new UnsupportedForMessageFormatException("The message format version 
on the broker does not support the request.")),
-POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do 
not satisfy the system policy."));
+POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do 
not satisfy the configured policy."));
 
 private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java 
b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
index 94f1e76..22a7c1d 100644
--- 
a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
+++ 
b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
@@ -13,42 +13,90 @@
 
 package org.apache.kafka.server.policy;
 
+import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.errors.PolicyViolationException;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-public interface CreateTopicPolicy {
+/**
+ * An interface for enforcing a policy on create topics requests.
+ *
+ * Common use cases are requiring that the replication factor, 
min.insync.replicas and/or retention settings for a
+ * 

kafka git commit: MINOR: remove ZK from system tests

2017-01-17 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 73b7ae001 -> e3bdc84d8


MINOR: remove ZK from system tests

Author: Matthias J. Sax 

Reviewers: Guozhang Wang 

Closes #2391 from mjsax/kafka-4060-zk-follow-up-system-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e3bdc84d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e3bdc84d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e3bdc84d

Branch: refs/heads/trunk
Commit: e3bdc84d864a5ca655f985473770f7b7cb24b79b
Parents: 73b7ae0
Author: Matthias J. Sax 
Authored: Tue Jan 17 18:14:43 2017 -0800
Committer: Guozhang Wang 
Committed: Tue Jan 17 18:14:43 2017 -0800

--
 .../kafka/streams/perf/SimpleBenchmark.java | 26 +---
 .../streams/smoketest/ShutdownDeadlockTest.java |  7 +-
 .../streams/smoketest/SmokeTestClient.java  |  8 +++---
 .../streams/smoketest/SmokeTestDriver.java  |  8 +++---
 .../streams/smoketest/StreamsSmokeTest.java | 10 +++-
 5 files changed, 23 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/e3bdc84d/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
--
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java 
b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 7ba6161..fb26206 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -57,7 +57,6 @@ import java.util.Random;
 public class SimpleBenchmark {
 
 private final String kafka;
-private final String zookeeper;
 private final File stateDir;
 
 private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
@@ -91,18 +90,16 @@ public class SimpleBenchmark {
 private static final Serde BYTE_SERDE = Serdes.ByteArray();
 private static final Serde INTEGER_SERDE = Serdes.Integer();
 
-public SimpleBenchmark(File stateDir, String kafka, String zookeeper) {
+public SimpleBenchmark(File stateDir, String kafka) {
 super();
 this.stateDir = stateDir;
 this.kafka = kafka;
-this.zookeeper = zookeeper;
 }
 
 public static void main(String[] args) throws Exception {
 String kafka = args.length > 0 ? args[0] : "localhost:9092";
-String zookeeper = args.length > 1 ? args[1] : "localhost:2181";
-String stateDirStr = args.length > 2 ? args[2] : 
"/tmp/kafka-streams-simple-benchmark";
-numRecords = args.length > 3 ? Integer.parseInt(args[3]) : 1000;
+String stateDirStr = args.length > 1 ? args[1] : 
"/tmp/kafka-streams-simple-benchmark";
+numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
 endKey = numRecords - 1;
 
 final File stateDir = new File(stateDirStr);
@@ -113,11 +110,10 @@ public class SimpleBenchmark {
 // Note: this output is needed for automated tests and must not be 
removed
 System.out.println("SimpleBenchmark instance started");
 System.out.println("kafka=" + kafka);
-System.out.println("zookeeper=" + zookeeper);
 System.out.println("stateDir=" + stateDir);
 System.out.println("numRecords=" + numRecords);
 
-SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, 
zookeeper);
+SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka);
 
 // producer performance
 benchmark.produce(SOURCE_TOPIC, VALUE_SIZE, 
"simple-benchmark-produce", numRecords, true, numRecords, true);
@@ -239,7 +235,7 @@ public class SimpleBenchmark {
 public void processStream(String topic) {
 CountDownLatch latch = new CountDownLatch(1);
 
-final KafkaStreams streams = createKafkaStreams(topic, stateDir, 
kafka, zookeeper, latch);
+final KafkaStreams streams = createKafkaStreams(topic, stateDir, 
kafka, latch);
 
 Thread thread = new Thread() {
 public void run() {
@@ -273,7 +269,7 @@ public class SimpleBenchmark {
 public void processStreamWithSink(String topic) {
 CountDownLatch latch = new CountDownLatch(1);
 
-final KafkaStreams streams = createKafkaStreamsWithSink(topic, 
stateDir, kafka, zookeeper, latch);
+final KafkaStreams streams = createKafkaStreamsWithSink(topic, 
stateDir, kafka, latch);
 
 Thread thread = new Thread() {
 public void run() {
@@ -337,7 +333,7 @@ public class SimpleBenchmark {
 public void processStreamWithStateStore(String topic) {
 CountDownLatch latch = new 

[1/2] kafka git commit: KAFKA-3452 Follow-up: Refactoring StateStore hierarchies

2017-01-17 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 c9b9acf6a -> 6f72a5a53


http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
--
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index 84f1734..abaaffd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.WindowStore;
 
@@ -58,23 +59,35 @@ public class RocksDBWindowStoreSupplier extends 
AbstractStoreSupplier segmentedStore = new 
RocksDBWindowStore<>(name, retainDuplicates, keySerde, valueSerde,
-   
  logged ? new ChangeLoggingSegmentedBytesStore(bytesStore)
-   
: bytesStore);
-return new MeteredWindowStore<>(segmentedStore, "rocksdb-window", 
time);
-}
+return maybeWrapCaching(
+maybeWrapLogged(
+new RocksDBSegmentedBytesStore(
+name,
+retentionPeriod,
+numSegments,
+new WindowStoreKeySchema()
+)));
 
-return new CachingWindowStore<>(new MeteredSegmentedBytesStore(logged 
? new ChangeLoggingSegmentedBytesStore(bytesStore)
-   
: bytesStore,
-   
"rocksdb-window",
-   time),
-keySerde, valueSerde, windowSize);
 }
 
 @Override
 public long retentionPeriod() {
 return retentionPeriod;
 }
+
+private SegmentedBytesStore maybeWrapLogged(final SegmentedBytesStore 
inner) {
+if (!logged) {
+return inner;
+}
+return new ChangeLoggingSegmentedBytesStore(inner);
+}
+
+private WindowStore maybeWrapCaching(final SegmentedBytesStore 
inner) {
+final MeteredSegmentedBytesStore metered = new 
MeteredSegmentedBytesStore(inner, "rocksdb-window", time);
+if (!enableCaching) {
+return new RocksDBWindowStore<>(metered, keySerde, valueSerde, 
retainDuplicates);
+}
+final RocksDBWindowStore windowed = 
RocksDBWindowStore.bytesStore(metered, retainDuplicates);
+return new CachingWindowStore<>(windowed, keySerde, valueSerde, 
windowSize);
+}
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
--
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
new file mode 100644
index 000..d76e8a4
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 

[2/2] kafka git commit: KAFKA-3452 Follow-up: Refactoring StateStore hierarchies

2017-01-17 Thread guozhang
KAFKA-3452 Follow-up: Refactoring StateStore hierarchies

This is a follow up of https://github.com/apache/kafka/pull/2166 - refactoring 
the store hierarchies as requested

Author: Damian Guy 

Reviewers: Guozhang Wang 

Closes #2360 from dguy/state-store-refactor


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73b7ae00
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73b7ae00
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73b7ae00

Branch: refs/heads/trunk
Commit: 73b7ae0019d387407375f3865e263225c986a6ce
Parents: 825f225
Author: Damian Guy 
Authored: Tue Jan 17 14:13:46 2017 -0800
Committer: Guozhang Wang 
Committed: Tue Jan 17 14:13:46 2017 -0800

--
 .../kstream/internals/SessionKeySerde.java  |  17 ++
 .../streams/state/WindowStoreIterator.java  |   3 +-
 .../AbstractMergedSortedCacheStoreIterator.java | 166 +++
 .../state/internals/CachingKeyValueStore.java   |  10 +-
 .../state/internals/CachingSessionStore.java|  63 ++
 .../state/internals/CachingWindowStore.java |  64 +++---
 .../ChangeLoggingKeyValueBytesStore.java|  93 +
 .../internals/ChangeLoggingKeyValueStore.java   | 127 
 .../ChangeLoggingSegmentedBytesStore.java   |  28 +--
 .../internals/CompositeReadOnlyWindowStore.java |   5 +
 .../DelegatingPeekingKeyValueIterator.java  |  10 +-
 .../MergedSortedCacheKeyValueStoreIterator.java | 130 ++--
 .../MergedSortedCacheSessionStoreIterator.java  |  71 +++
 .../MergedSortedCacheWindowStoreIterator.java   |  58 ++
 .../MergedSortedCachedWindowStoreIterator.java  | 107 --
 .../state/internals/MeteredKeyValueStore.java   |  27 +--
 .../internals/MeteredSegmentedBytesStore.java   |  27 +--
 .../state/internals/MeteredWindowStore.java | 180 
 .../internals/RocksDBKeyValueStoreSupplier.java |  54 +++--
 .../internals/RocksDBSessionStoreSupplier.java  |  54 +++--
 .../streams/state/internals/RocksDBStore.java   |  28 +--
 .../state/internals/RocksDBWindowStore.java |  25 ++-
 .../internals/RocksDBWindowStoreSupplier.java   |  37 ++--
 .../internals/SerializedKeyValueIterator.java   |  70 +++
 .../state/internals/WindowStoreUtils.java   |   3 +
 .../state/internals/WrappedStateStore.java  |  90 
 .../internals/KGroupedStreamImplTest.java   |  42 +++-
 .../internals/CachingSessionStoreTest.java  |   3 +-
 .../state/internals/CachingWindowStoreTest.java |   5 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java| 165 +++
 .../ChangeLoggingKeyValueStoreTest.java | 207 +++
 .../DelegatingPeekingKeyValueIteratorTest.java  |  12 +-
 ...rgedSortedCacheSessionStoreIteratorTest.java | 113 ++
 ...ergedSortedCacheWindowStoreIteratorTest.java |  35 +++-
 .../internals/ReadOnlyWindowStoreStub.java  |   5 +
 .../RocksDBKeyValueStoreSupplierTest.java   | 155 ++
 .../RocksDBSessionStoreSupplierTest.java| 169 +++
 .../RocksDBWindowStoreSupplierTest.java | 168 +++
 .../state/internals/RocksDBWindowStoreTest.java |   2 -
 .../SerializedKeyValueIteratorTest.java |  95 +
 40 files changed, 2081 insertions(+), 642 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
--
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
index 48213d6..d9a3528 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 
 import java.nio.ByteBuffer;
@@ -146,4 +147,20 @@ public class SessionKeySerde implements 
Serde {
 buf.putLong(sessionKey.window().start());
 return new Bytes(buf.array());
 }
+
+public static Bytes bytesToBinary(final Windowed sessionKey) {
+final byte[] bytes = sessionKey.key().get();
+ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * 
TIMESTAMP_SIZE);
+buf.put(bytes);
+buf.putLong(sessionKey.window().end());
+

kafka git commit: KAFKA-4588: Wait for topics to be created in QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable

2017-01-17 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 3f6c4f63c -> 825f225bc


KAFKA-4588: Wait for topics to be created in 
QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable

After debugging this i can see the times that it fails there is a race between 
when the topic is actually created/ready on the broker and when the assignment 
happens. When it fails `StreamPartitionAssignor.assign(..)` gets called with a 
`Cluster` with no topics. Hence the test hangs as no tasks get assigned. To fix 
this I added a `waitForTopics` method to `EmbeddedKafkaCluster`. This will wait 
until the topics have been created.

Author: Damian Guy 

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2371 from dguy/integration-test-fix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/825f225b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/825f225b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/825f225b

Branch: refs/heads/trunk
Commit: 825f225bc5706b16af8ec44ca47ee1452c11e6f3
Parents: 3f6c4f6
Author: Damian Guy 
Authored: Tue Jan 17 12:33:11 2017 -0800
Committer: Guozhang Wang 
Committed: Tue Jan 17 12:33:11 2017 -0800

--
 checkstyle/import-control.xml   |  1 +
 .../GlobalKTableIntegrationTest.java|  4 +-
 .../KStreamAggregationDedupIntegrationTest.java |  4 +-
 .../KStreamAggregationIntegrationTest.java  |  4 +-
 .../KStreamKTableJoinIntegrationTest.java   |  2 +-
 .../integration/KStreamRepartitionJoinTest.java |  8 ++--
 .../QueryableStateIntegrationTest.java  | 14 +++
 .../integration/utils/EmbeddedKafkaCluster.java | 24 +--
 .../integration/utils/IntegrationTestUtils.java | 43 
 .../integration/utils/KafkaEmbedded.java|  3 ++
 .../org/apache/kafka/test/StreamsTestUtils.java |  1 +
 11 files changed, 87 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/checkstyle/import-control.xml
--
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b68cf98..04f364c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -156,6 +156,7 @@
 
 
   
+  
   
   
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
--
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 85b851d..6ac87ae 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -81,7 +81,7 @@ public class GlobalKTableIntegrationTest {
 private ForeachAction foreachAction;
 
 @Before
-public void before() {
+public void before() throws InterruptedException {
 testNo++;
 builder = new KStreamBuilder();
 createTopics();
@@ -212,7 +212,7 @@ public class GlobalKTableIntegrationTest {
 }
 
 
-private void createTopics() {
+private void createTopics() throws InterruptedException {
 inputStream = "input-stream-" + testNo;
 inputTable = "input-table-" + testNo;
 globalOne = "globalOne-" + testNo;

http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
--
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 9397e03..f2a767c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -72,7 +72,7 @@ public class KStreamAggregationDedupIntegrationTest {
 
 
 @Before
-public void before() {
+public void before() throws InterruptedException {
 testNo++;
 builder = new KStreamBuilder();
 createTopics();
@@ -267,7 +267,7 @@ public class KStreamAggregationDedupIntegrationTest {
 }
 
 
-private void createTopics() {
+private void createTopics() throws InterruptedException {
 

kafka git commit: KAFKA-4588: Wait for topics to be created in QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable

2017-01-17 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 60d759a22 -> c9b9acf6a


KAFKA-4588: Wait for topics to be created in 
QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable

After debugging this i can see the times that it fails there is a race between 
when the topic is actually created/ready on the broker and when the assignment 
happens. When it fails `StreamPartitionAssignor.assign(..)` gets called with a 
`Cluster` with no topics. Hence the test hangs as no tasks get assigned. To fix 
this I added a `waitForTopics` method to `EmbeddedKafkaCluster`. This will wait 
until the topics have been created.

Author: Damian Guy 

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2371 from dguy/integration-test-fix

(cherry picked from commit 825f225bc5706b16af8ec44ca47ee1452c11e6f3)
Signed-off-by: Guozhang Wang 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9b9acf6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9b9acf6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9b9acf6

Branch: refs/heads/0.10.2
Commit: c9b9acf6a8b542c2d0d825c17a4a20cf3fa5
Parents: 60d759a
Author: Damian Guy 
Authored: Tue Jan 17 12:33:11 2017 -0800
Committer: Guozhang Wang 
Committed: Tue Jan 17 12:33:20 2017 -0800

--
 checkstyle/import-control.xml   |  1 +
 .../GlobalKTableIntegrationTest.java|  4 +-
 .../KStreamAggregationDedupIntegrationTest.java |  4 +-
 .../KStreamAggregationIntegrationTest.java  |  4 +-
 .../KStreamKTableJoinIntegrationTest.java   |  2 +-
 .../integration/KStreamRepartitionJoinTest.java |  8 ++--
 .../QueryableStateIntegrationTest.java  | 14 +++
 .../integration/utils/EmbeddedKafkaCluster.java | 24 +--
 .../integration/utils/IntegrationTestUtils.java | 43 
 .../integration/utils/KafkaEmbedded.java|  3 ++
 .../org/apache/kafka/test/StreamsTestUtils.java |  1 +
 11 files changed, 87 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/c9b9acf6/checkstyle/import-control.xml
--
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b68cf98..04f364c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -156,6 +156,7 @@
 
 
   
+  
   
   
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9b9acf6/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
--
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 85b851d..6ac87ae 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -81,7 +81,7 @@ public class GlobalKTableIntegrationTest {
 private ForeachAction foreachAction;
 
 @Before
-public void before() {
+public void before() throws InterruptedException {
 testNo++;
 builder = new KStreamBuilder();
 createTopics();
@@ -212,7 +212,7 @@ public class GlobalKTableIntegrationTest {
 }
 
 
-private void createTopics() {
+private void createTopics() throws InterruptedException {
 inputStream = "input-stream-" + testNo;
 inputTable = "input-table-" + testNo;
 globalOne = "globalOne-" + testNo;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9b9acf6/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
--
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 9397e03..f2a767c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -72,7 +72,7 @@ public class KStreamAggregationDedupIntegrationTest {
 
 
 @Before
-public void before() {
+public void before() throws InterruptedException {
 testNo++;
 builder = new KStreamBuilder();
 createTopics();
@@ -267,7 +267,7 @@ public class KStreamAggregationDedupIntegrationTest {
   

kafka git commit: KAFKA-4580; Use sasl.jaas.config for some system tests

2017-01-17 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 e3f4cdd0e -> 2b19ad9d8


KAFKA-4580; Use sasl.jaas.config for some system tests

Switched console_consumer, verifiable_consumer and verifiable_producer to use 
new sasl.jaas_config property instead of static JAAS configuration file when 
used with SASL_PLAINTEXT.

Author: Rajini Sivaram 

Reviewers: Ewen Cheslack-Postava , Ismael Juma 


Closes #2323 from rajinisivaram/KAFKA-4580

(cherry picked from commit 3f6c4f63c9c17424cf717ca76c74554bcf3b2e9a)
Signed-off-by: Ismael Juma 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2b19ad9d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2b19ad9d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2b19ad9d

Branch: refs/heads/0.10.2
Commit: 2b19ad9d8c47fb0f78a6e90d2f5711df6110bf1f
Parents: e3f4cdd
Author: Rajini Sivaram 
Authored: Tue Jan 17 18:42:55 2017 +
Committer: Ismael Juma 
Committed: Tue Jan 17 18:43:25 2017 +

--
 tests/kafkatest/services/console_consumer.py|  4 +--
 .../services/security/security_config.py| 28 +++-
 .../services/security/templates/jaas.conf   |  4 +++
 tests/kafkatest/services/verifiable_consumer.py |  5 ++--
 tests/kafkatest/services/verifiable_producer.py |  9 +++
 5 files changed, 35 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/2b19ad9d/tests/kafkatest/services/console_consumer.py
--
diff --git a/tests/kafkatest/services/console_consumer.py 
b/tests/kafkatest/services/console_consumer.py
index 17ddb6b..cdc46cd 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -150,7 +150,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
 
 # Add security properties to the config. If security protocol is not 
specified,
 # use the default in the template properties.
-self.security_config = 
self.kafka.security_config.client_config(prop_file)
+self.security_config = 
self.kafka.security_config.client_config(prop_file, node)
+self.security_config.setup_node(node)
 
 prop_file += str(self.security_config)
 return prop_file
@@ -231,7 +232,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
 prop_file = self.prop_file(node)
 self.logger.info(prop_file)
 node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
-self.security_config.setup_node(node)
 
 # Create and upload log properties
 log_config = self.render('tools_log4j.properties', 
log_file=ConsoleConsumer.LOG_FILE)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b19ad9d/tests/kafkatest/services/security/security_config.py
--
diff --git a/tests/kafkatest/services/security/security_config.py 
b/tests/kafkatest/services/security/security_config.py
index 864e0a3..846d9b1 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -112,7 +112,7 @@ class SecurityConfig(TemplateRenderer):
 
 def __init__(self, context, security_protocol=None, 
interbroker_security_protocol=None,
  client_sasl_mechanism=SASL_MECHANISM_GSSAPI, 
interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
- zk_sasl=False, template_props=""):
+ zk_sasl=False, template_props="", static_jaas_conf=True):
 """
 Initialize the security properties for the node and copy
 keystore and truststore to the remote node if the transport protocol 
@@ -143,6 +143,7 @@ class SecurityConfig(TemplateRenderer):
 self.has_sasl = self.is_sasl(security_protocol) or 
self.is_sasl(interbroker_security_protocol) or zk_sasl
 self.has_ssl = self.is_ssl(security_protocol) or 
self.is_ssl(interbroker_security_protocol)
 self.zk_sasl = zk_sasl
+self.static_jaas_conf = static_jaas_conf
 self.properties = {
 'security.protocol' : security_protocol,
 'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH,
@@ -156,8 +157,14 @@ class SecurityConfig(TemplateRenderer):
 'sasl.kerberos.service.name' : 'kafka'
 }
 
-def client_config(self, template_props=""):
-return SecurityConfig(self.context, self.security_protocol, 
client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props)
+def client_config(self, template_props="", node=None):
+# If node 

kafka git commit: KAFKA-4580; Use sasl.jaas.config for some system tests

2017-01-17 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/trunk 7a84b241e -> 3f6c4f63c


KAFKA-4580; Use sasl.jaas.config for some system tests

Switched console_consumer, verifiable_consumer and verifiable_producer to use 
new sasl.jaas_config property instead of static JAAS configuration file when 
used with SASL_PLAINTEXT.

Author: Rajini Sivaram 

Reviewers: Ewen Cheslack-Postava , Ismael Juma 


Closes #2323 from rajinisivaram/KAFKA-4580


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f6c4f63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f6c4f63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f6c4f63

Branch: refs/heads/trunk
Commit: 3f6c4f63c9c17424cf717ca76c74554bcf3b2e9a
Parents: 7a84b24
Author: Rajini Sivaram 
Authored: Tue Jan 17 18:42:55 2017 +
Committer: Ismael Juma 
Committed: Tue Jan 17 18:42:55 2017 +

--
 tests/kafkatest/services/console_consumer.py|  4 +--
 .../services/security/security_config.py| 28 +++-
 .../services/security/templates/jaas.conf   |  4 +++
 tests/kafkatest/services/verifiable_consumer.py |  5 ++--
 tests/kafkatest/services/verifiable_producer.py |  9 +++
 5 files changed, 35 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/3f6c4f63/tests/kafkatest/services/console_consumer.py
--
diff --git a/tests/kafkatest/services/console_consumer.py 
b/tests/kafkatest/services/console_consumer.py
index 17ddb6b..cdc46cd 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -150,7 +150,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
 
 # Add security properties to the config. If security protocol is not 
specified,
 # use the default in the template properties.
-self.security_config = 
self.kafka.security_config.client_config(prop_file)
+self.security_config = 
self.kafka.security_config.client_config(prop_file, node)
+self.security_config.setup_node(node)
 
 prop_file += str(self.security_config)
 return prop_file
@@ -231,7 +232,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
 prop_file = self.prop_file(node)
 self.logger.info(prop_file)
 node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
-self.security_config.setup_node(node)
 
 # Create and upload log properties
 log_config = self.render('tools_log4j.properties', 
log_file=ConsoleConsumer.LOG_FILE)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f6c4f63/tests/kafkatest/services/security/security_config.py
--
diff --git a/tests/kafkatest/services/security/security_config.py 
b/tests/kafkatest/services/security/security_config.py
index 864e0a3..846d9b1 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -112,7 +112,7 @@ class SecurityConfig(TemplateRenderer):
 
 def __init__(self, context, security_protocol=None, 
interbroker_security_protocol=None,
  client_sasl_mechanism=SASL_MECHANISM_GSSAPI, 
interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
- zk_sasl=False, template_props=""):
+ zk_sasl=False, template_props="", static_jaas_conf=True):
 """
 Initialize the security properties for the node and copy
 keystore and truststore to the remote node if the transport protocol 
@@ -143,6 +143,7 @@ class SecurityConfig(TemplateRenderer):
 self.has_sasl = self.is_sasl(security_protocol) or 
self.is_sasl(interbroker_security_protocol) or zk_sasl
 self.has_ssl = self.is_ssl(security_protocol) or 
self.is_ssl(interbroker_security_protocol)
 self.zk_sasl = zk_sasl
+self.static_jaas_conf = static_jaas_conf
 self.properties = {
 'security.protocol' : security_protocol,
 'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH,
@@ -156,8 +157,14 @@ class SecurityConfig(TemplateRenderer):
 'sasl.kerberos.service.name' : 'kafka'
 }
 
-def client_config(self, template_props=""):
-return SecurityConfig(self.context, self.security_protocol, 
client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props)
+def client_config(self, template_props="", node=None):
+# If node is not specified, use static jaas config which will be 
created later.
+# Otherwise use static JAAS 

kafka git commit: MINOR: Some cleanups and additional testing for KIP-88

2017-01-17 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/trunk 55abe65e0 -> 7a84b241e


MINOR: Some cleanups and additional testing for KIP-88

Author: Jason Gustafson 

Reviewers: Vahid Hashemian , Ismael Juma 


Closes #2383 from hachikuji/minor-cleanup-kip-88


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7a84b241
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7a84b241
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7a84b241

Branch: refs/heads/trunk
Commit: 7a84b241eeb8cb63400a9512b066c3f733f94b8c
Parents: 55abe65
Author: Jason Gustafson 
Authored: Tue Jan 17 10:42:05 2017 -0800
Committer: Jason Gustafson 
Committed: Tue Jan 17 10:42:05 2017 -0800

--
 .../common/requests/OffsetFetchRequest.java | 24 +-
 .../common/requests/OffsetFetchResponse.java| 83 ++-
 .../clients/consumer/KafkaConsumerTest.java |  2 +-
 .../internals/ConsumerCoordinatorTest.java  |  4 +-
 .../common/requests/RequestResponseTest.java| 23 --
 .../scala/kafka/api/OffsetFetchRequest.scala|  1 -
 .../kafka/coordinator/GroupCoordinator.scala|  4 +-
 .../coordinator/GroupMetadataManager.scala  | 31 ---
 .../src/main/scala/kafka/server/KafkaApis.scala | 86 
 .../kafka/api/AuthorizerIntegrationTest.scala   | 37 -
 .../GroupCoordinatorResponseTest.scala  | 58 -
 11 files changed, 211 insertions(+), 142 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 0ff49be..553fd96 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -78,7 +79,7 @@ public class OffsetFetchRequest extends AbstractRequest {
 private final List partitions;
 
 public static OffsetFetchRequest forAllPartitions(String groupId) {
-return new OffsetFetchRequest.Builder(groupId, (List) 
null).setVersion((short) 2).build();
+return new OffsetFetchRequest.Builder(groupId, 
null).setVersion((short) 2).build();
 }
 
 // v0, v1, and v2 have the same fields.
@@ -131,20 +132,35 @@ public class OffsetFetchRequest extends AbstractRequest {
 groupId = struct.getString(GROUP_ID_KEY_NAME);
 }
 
-@Override
-public AbstractResponse getErrorResponse(Throwable e) {
+public OffsetFetchResponse getErrorResponse(Errors error) {
 short versionId = version();
+
+Map 
responsePartitions = new HashMap<>();
+if (versionId < 2) {
+for (TopicPartition partition : this.partitions) {
+responsePartitions.put(partition, new 
OffsetFetchResponse.PartitionData(
+OffsetFetchResponse.INVALID_OFFSET,
+OffsetFetchResponse.NO_METADATA,
+error));
+}
+}
+
 switch (versionId) {
 case 0:
 case 1:
 case 2:
-return new OffsetFetchResponse(Errors.forException(e), 
partitions, versionId);
+return new OffsetFetchResponse(error, responsePartitions, 
versionId);
 default:
 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
 versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id)));
 }
 }
 
+@Override
+public OffsetFetchResponse getErrorResponse(Throwable e) {
+return getErrorResponse(Errors.forException(e));
+}
+
 public String groupId() {
 return groupId;
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 0095f38..9c14155 100644
--- 

[1/2] kafka git commit: KAFKA-4363; Documentation for sasl.jaas.config property

2017-01-17 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 7946b2f35 -> e3f4cdd0e


KAFKA-4363; Documentation for sasl.jaas.config property

Author: Rajini Sivaram 

Reviewers: Ismael Juma 

Closes #2316 from rajinisivaram/KAFKA-4363


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/621dff22
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/621dff22
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/621dff22

Branch: refs/heads/0.10.2
Commit: 621dff22e79dc64b9a8748186dd985774044f91a
Parents: 7946b2f
Author: Rajini Sivaram 
Authored: Tue Jan 17 11:16:29 2017 +
Committer: Ismael Juma 
Committed: Tue Jan 17 12:56:37 2017 +

--
 docs/security.html | 147 
 1 file changed, 99 insertions(+), 48 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/621dff22/docs/security.html
--
diff --git a/docs/security.html b/docs/security.html
index 350f0cf..8cb867e 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -263,23 +263,65 @@
 SASL configuration for Kafka 
clients
 SASL authentication is only supported for the new Java Kafka producer 
and
-consumer, the older API is not supported. To configure SASL 
authentication
-on the clients:
+consumer, the older API is not supported. JAAS configuration for 
clients may
+be specified as a static JAAS config file or using the client 
configuration property
+sasl.jaas.config.
+To configure SASL authentication on the clients:
 
 Select a SASL mechanism for authentication.
-Add a JAAS config file for the selected mechanism as described in 
the examples
-for setting up GSSAPI (Kerberos)
-or PLAIN. 
KafkaClient is the
-section name in the JAAS file used by Kafka clients.
-Pass the JAAS config file location as JVM parameter to each client 
JVM. For example:
-
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
 Configure the following properties in producer.properties or
 consumer.properties:
 security.protocol=SASL_PLAINTEXT (or SASL_SSL)
-sasl.mechanism=GSSAPI (or PLAIN)
+sasl.mechanism=GSSAPI (or PLAIN)
 Follow the steps in GSSAPI (Kerberos)
 or PLAIN to 
configure SASL
 for the selected mechanism.
+Configure JAAS using client 
configuration property
+or static JAAS config 
file as described below.
+
+
+JAAS configuration using client 
configuration property
+Clients may specify JAAS configuration as a producer or 
consumer property without
+creating a physical configuration file. This mode also enables 
different producers
+and consumers within the same JVM to use different credentials by 
specifying
+different properties for each client. If both static JAAS 
configuration system property
+java.security.auth.login.config and client property 
sasl.jaas.config
+are specified, the client property will be used.
+
+To configure SASL authentication on the clients using 
configuration property:
+
+Configure the property sasl.jaas.config in 
producer.properties or
+consumer.properties to be the JAAS login module section of the 
selected mechanism.
+For example, PLAIN
+credentials may be configured as:
+
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="alice" password="alice-secret";
+See GSSAPI 
(Kerberos) or PLAIN
+for full example configurations.
+
+
+JAAS configuration using static config 
file
+To configure SASL authentication on the clients using static JAAS 
config file:
+
+Add a JAAS config file with a client login section named 
KafkaClient. Configure
+a login module in KafkaClient for the selected 
mechanism as described in the examples
+for setting up GSSAPI (Kerberos)
+or PLAIN.
+For example, GSSAPI
+credentials may be configured as:
+
+KafkaClient {
+com.sun.security.auth.module.Krb5LoginModule required
+useKeyTab=true
+storeKey=true
+keyTab="/etc/security/keytabs/kafka_client.keytab"
+principal="kafka-clien...@example.com";
+};
+See GSSAPI 
(Kerberos) or PLAIN
+for 

kafka git commit: KAFKA-4590; SASL/SCRAM system tests

2017-01-17 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/trunk b4d8668d6 -> 55abe65e0


KAFKA-4590; SASL/SCRAM system tests

Runs sanity test and one replication test using SASL/SCRAM.

Author: Rajini Sivaram 

Reviewers: Ewen Cheslack-Postava , Ismael Juma 


Closes #2355 from rajinisivaram/KAFKA-4590


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/55abe65e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/55abe65e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/55abe65e

Branch: refs/heads/trunk
Commit: 55abe65e0996008d54bd5bb8440906ac4a359937
Parents: b4d8668
Author: Rajini Sivaram 
Authored: Tue Jan 17 12:55:07 2017 +
Committer: Ismael Juma 
Committed: Tue Jan 17 12:55:07 2017 +

--
 .../sanity_checks/test_console_consumer.py  |  2 +-
 tests/kafkatest/services/kafka/kafka.py |  6 +
 .../services/security/security_config.py| 25 
 .../services/security/templates/jaas.conf   |  9 +++
 tests/kafkatest/tests/core/replication_test.py  |  6 -
 5 files changed, 46 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/55abe65e/tests/kafkatest/sanity_checks/test_console_consumer.py
--
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py 
b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 38db057..066d6d4 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -47,7 +47,7 @@ class ConsoleConsumerTest(Test):
 @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
 @matrix(security_protocol=['PLAINTEXT', 'SSL'])
 @cluster(num_nodes=4)
-@parametrize(security_protocol='SASL_SSL', sasl_mechanism='PLAIN')
+@matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 
'SCRAM-SHA-256', 'SCRAM-SHA-512'])
 @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'])
 def test_lifecycle(self, security_protocol, new_consumer=True, 
sasl_mechanism='GSSAPI'):
 """Check that console consumer starts/stops properly, and that we are 
capturing log output."""

http://git-wip-us.apache.org/repos/asf/kafka/blob/55abe65e/tests/kafkatest/services/kafka/kafka.py
--
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 716c2d2..8ef0f35 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -208,6 +208,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
 node.account.create_file(self.LOG4J_CONFIG, 
self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR))
 
 self.security_config.setup_node(node)
+self.security_config.setup_credentials(node, self.path, 
self.zk.connect_setting(), broker=True)
 
 cmd = self.start_cmd(node)
 self.logger.debug("Attempting to start KafkaService on %s with 
command: %s" % (str(node.account), cmd))
@@ -215,6 +216,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
 node.account.ssh(cmd)
 monitor.wait_until("Kafka Server.*started", timeout_sec=30, 
backoff_sec=.25, err_msg="Kafka server didn't finish startup")
 
+# Credentials for inter-broker communication are created before 
starting Kafka.
+# Client credentials are created after starting Kafka so that both 
loading of
+# existing credentials from ZK and dynamic update of credentials in 
Kafka are tested.
+self.security_config.setup_credentials(node, self.path, 
self.zk.connect_setting(), broker=False)
+
 self.start_jmx_tool(self.idx(node), node)
 if len(self.pids(node)) == 0:
 raise Exception("No process ids recorded on node %s" % str(node))

http://git-wip-us.apache.org/repos/asf/kafka/blob/55abe65e/tests/kafkatest/services/security/security_config.py
--
diff --git a/tests/kafkatest/services/security/security_config.py 
b/tests/kafkatest/services/security/security_config.py
index 9b29217..864e0a3 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -94,6 +94,12 @@ class SecurityConfig(TemplateRenderer):
 SASL_SSL = 'SASL_SSL'
 SASL_MECHANISM_GSSAPI = 'GSSAPI'
 SASL_MECHANISM_PLAIN = 'PLAIN'
+SASL_MECHANISM_SCRAM_SHA_256 = 'SCRAM-SHA-256'
+SASL_MECHANISM_SCRAM_SHA_512 = 'SCRAM-SHA-512'
+SCRAM_CLIENT_USER = "kafka-client"
+

kafka git commit: KAFKA-4363; Documentation for sasl.jaas.config property

2017-01-17 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/trunk b62804a25 -> b4d8668d6


KAFKA-4363; Documentation for sasl.jaas.config property

Author: Rajini Sivaram 

Reviewers: Ismael Juma 

Closes #2316 from rajinisivaram/KAFKA-4363


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b4d8668d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b4d8668d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b4d8668d

Branch: refs/heads/trunk
Commit: b4d8668d6d47fca8cc6de4f550d703cd8926c436
Parents: b62804a
Author: Rajini Sivaram 
Authored: Tue Jan 17 11:16:29 2017 +
Committer: Ismael Juma 
Committed: Tue Jan 17 11:16:29 2017 +

--
 docs/security.html | 147 
 1 file changed, 99 insertions(+), 48 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/b4d8668d/docs/security.html
--
diff --git a/docs/security.html b/docs/security.html
index 350f0cf..8cb867e 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -263,23 +263,65 @@
 SASL configuration for Kafka 
clients
 SASL authentication is only supported for the new Java Kafka producer 
and
-consumer, the older API is not supported. To configure SASL 
authentication
-on the clients:
+consumer, the older API is not supported. JAAS configuration for 
clients may
+be specified as a static JAAS config file or using the client 
configuration property
+sasl.jaas.config.
+To configure SASL authentication on the clients:
 
 Select a SASL mechanism for authentication.
-Add a JAAS config file for the selected mechanism as described in 
the examples
-for setting up GSSAPI (Kerberos)
-or PLAIN. 
KafkaClient is the
-section name in the JAAS file used by Kafka clients.
-Pass the JAAS config file location as JVM parameter to each client 
JVM. For example:
-
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
 Configure the following properties in producer.properties or
 consumer.properties:
 security.protocol=SASL_PLAINTEXT (or SASL_SSL)
-sasl.mechanism=GSSAPI (or PLAIN)
+sasl.mechanism=GSSAPI (or PLAIN)
 Follow the steps in GSSAPI (Kerberos)
 or PLAIN to 
configure SASL
 for the selected mechanism.
+Configure JAAS using client 
configuration property
+or static JAAS config 
file as described below.
+
+
+JAAS configuration using client 
configuration property
+Clients may specify JAAS configuration as a producer or 
consumer property without
+creating a physical configuration file. This mode also enables 
different producers
+and consumers within the same JVM to use different credentials by 
specifying
+different properties for each client. If both static JAAS 
configuration system property
+java.security.auth.login.config and client property 
sasl.jaas.config
+are specified, the client property will be used.
+
+To configure SASL authentication on the clients using 
configuration property:
+
+Configure the property sasl.jaas.config in 
producer.properties or
+consumer.properties to be the JAAS login module section of the 
selected mechanism.
+For example, PLAIN
+credentials may be configured as:
+
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="alice" password="alice-secret";
+See GSSAPI 
(Kerberos) or PLAIN
+for full example configurations.
+
+
+JAAS configuration using static config 
file
+To configure SASL authentication on the clients using static JAAS 
config file:
+
+Add a JAAS config file with a client login section named 
KafkaClient. Configure
+a login module in KafkaClient for the selected 
mechanism as described in the examples
+for setting up GSSAPI (Kerberos)
+or PLAIN.
+For example, GSSAPI
+credentials may be configured as:
+
+KafkaClient {
+com.sun.security.auth.module.Krb5LoginModule required
+useKeyTab=true
+storeKey=true
+keyTab="/etc/security/keytabs/kafka_client.keytab"
+principal="kafka-clien...@example.com";
+};
+See GSSAPI 
(Kerberos) or PLAIN
+for