kafka git commit: KAFKA-3502; move RocksDB options construction to init()
Repository: kafka Updated Branches: refs/heads/0.10.2 eb62e5695 -> e38794e02 KAFKA-3502; move RocksDB options construction to init() In RocksDBStore, options / wOptions / fOptions are constructed in the constructor, which needs to be dismissed in the close() call; however in some tests, the generated topology is not initialized at all, and hence the corresponding state stores are supposed to not be able to be closed as well since their `init` function is not called. This could cause the above option objects to be not released. This is fixed in this patch to move the logic out of constructor and inside `init` functions, so that no RocksDB objects will be created in the constructor only. Also some minor cleanups: 1. In KStreamTestDriver.close(), we lost the logic to close the state stores but only call `flush`; it is now changed back to call both. 2. Moved the forwarding logic from KStreamTestDriver to MockProcessorContext to remove the mutual dependency: these functions should really be in ProcessorContext, not the test driver. Author: Guozhang WangReviewers: Damian Guy , Matthias J. Sax , Jason Gustafson Closes #2381 from guozhangwang/K3502-pure-virtual-function-unit-tests (cherry picked from commit 1974e1b0e54abe5fdebd8ff3338df864b7ab60f3) Signed-off-by: Jason Gustafson Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e38794e0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e38794e0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e38794e0 Branch: refs/heads/0.10.2 Commit: e38794e020951adec5a5d0bbfe42c57294bf67bd Parents: eb62e56 Author: Guozhang Wang Authored: Tue Jan 17 20:29:55 2017 -0800 Committer: Jason Gustafson Committed: Tue Jan 17 20:37:00 2017 -0800 -- .../streams/state/internals/RocksDBStore.java | 21 +--- .../streams/kstream/KStreamBuilderTest.java | 58 +- .../internals/KStreamKTableLeftJoinTest.java| 2 - ...reamSessionWindowAggregateProcessorTest.java | 4 +- .../streams/state/KeyValueStoreTestDriver.java | 2 +- .../internals/CachingKeyValueStoreTest.java | 2 +- .../internals/CachingSessionStoreTest.java | 2 +- .../state/internals/CachingWindowStoreTest.java | 2 +- .../ChangeLoggingKeyValueBytesStoreTest.java| 3 +- .../ChangeLoggingKeyValueStoreTest.java | 3 +- .../ChangeLoggingSegmentedBytesStoreTest.java | 3 +- .../MeteredSegmentedBytesStoreTest.java | 3 +- .../RocksDBKeyValueStoreSupplierTest.java | 9 +- .../RocksDBSegmentedBytesStoreTest.java | 3 +- .../RocksDBSessionStoreSupplierTest.java| 9 +- .../internals/RocksDBSessionStoreTest.java | 3 +- .../RocksDBWindowStoreSupplierTest.java | 9 +- .../state/internals/RocksDBWindowStoreTest.java | 24 ++-- .../state/internals/SegmentIteratorTest.java| 27 +++-- .../streams/state/internals/SegmentsTest.java | 3 +- .../apache/kafka/test/KStreamTestDriver.java| 113 +-- .../apache/kafka/test/MockProcessorContext.java | 57 +++--- 22 files changed, 153 insertions(+), 209 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/e38794e0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java -- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 3f8d509..55c1bb8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -94,25 +94,21 @@ public class RocksDBStore implements KeyValueStore { protected volatile boolean open = false; - -public RocksDBStore(final String name, -final Serde keySerde, -final Serde valueSerde) { +RocksDBStore(String name, Serde keySerde, Serde valueSerde) { this(name, DB_FILE_DIR, keySerde, valueSerde); } - -public RocksDBStore(final String name, -final String parentDir, -final Serde keySerde, -final Serde valueSerde) { +RocksDBStore(String name, String parentDir, Serde keySerde, Serde valueSerde) { this.name = name; this.parentDir = parentDir; this.keySerde = keySerde; this.valueSerde = valueSerde; +} +@SuppressWarnings("unchecked") +public void openDB(ProcessorContext context) {
kafka git commit: KAFKA-3502; move RocksDB options construction to init()
Repository: kafka Updated Branches: refs/heads/trunk fd6d7bcf3 -> 1974e1b0e KAFKA-3502; move RocksDB options construction to init() In RocksDBStore, options / wOptions / fOptions are constructed in the constructor, which needs to be dismissed in the close() call; however in some tests, the generated topology is not initialized at all, and hence the corresponding state stores are supposed to not be able to be closed as well since their `init` function is not called. This could cause the above option objects to be not released. This is fixed in this patch to move the logic out of constructor and inside `init` functions, so that no RocksDB objects will be created in the constructor only. Also some minor cleanups: 1. In KStreamTestDriver.close(), we lost the logic to close the state stores but only call `flush`; it is now changed back to call both. 2. Moved the forwarding logic from KStreamTestDriver to MockProcessorContext to remove the mutual dependency: these functions should really be in ProcessorContext, not the test driver. Author: Guozhang WangReviewers: Damian Guy , Matthias J. Sax , Jason Gustafson Closes #2381 from guozhangwang/K3502-pure-virtual-function-unit-tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1974e1b0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1974e1b0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1974e1b0 Branch: refs/heads/trunk Commit: 1974e1b0e54abe5fdebd8ff3338df864b7ab60f3 Parents: fd6d7bc Author: Guozhang Wang Authored: Tue Jan 17 20:29:55 2017 -0800 Committer: Jason Gustafson Committed: Tue Jan 17 20:31:31 2017 -0800 -- .../streams/state/internals/RocksDBStore.java | 21 +--- .../streams/kstream/KStreamBuilderTest.java | 58 +- .../internals/KStreamKTableLeftJoinTest.java| 2 - ...reamSessionWindowAggregateProcessorTest.java | 4 +- .../streams/state/KeyValueStoreTestDriver.java | 2 +- .../internals/CachingKeyValueStoreTest.java | 2 +- .../internals/CachingSessionStoreTest.java | 2 +- .../state/internals/CachingWindowStoreTest.java | 2 +- .../ChangeLoggingKeyValueBytesStoreTest.java| 3 +- .../ChangeLoggingKeyValueStoreTest.java | 3 +- .../ChangeLoggingSegmentedBytesStoreTest.java | 3 +- .../MeteredSegmentedBytesStoreTest.java | 3 +- .../RocksDBKeyValueStoreSupplierTest.java | 9 +- .../RocksDBSegmentedBytesStoreTest.java | 3 +- .../RocksDBSessionStoreSupplierTest.java| 9 +- .../internals/RocksDBSessionStoreTest.java | 3 +- .../RocksDBWindowStoreSupplierTest.java | 9 +- .../state/internals/RocksDBWindowStoreTest.java | 24 ++-- .../state/internals/SegmentIteratorTest.java| 27 +++-- .../streams/state/internals/SegmentsTest.java | 3 +- .../apache/kafka/test/KStreamTestDriver.java| 113 +-- .../apache/kafka/test/MockProcessorContext.java | 57 +++--- 22 files changed, 153 insertions(+), 209 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/1974e1b0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java -- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 3f8d509..55c1bb8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -94,25 +94,21 @@ public class RocksDBStore implements KeyValueStore { protected volatile boolean open = false; - -public RocksDBStore(final String name, -final Serde keySerde, -final Serde valueSerde) { +RocksDBStore(String name, Serde keySerde, Serde valueSerde) { this(name, DB_FILE_DIR, keySerde, valueSerde); } - -public RocksDBStore(final String name, -final String parentDir, -final Serde keySerde, -final Serde valueSerde) { +RocksDBStore(String name, String parentDir, Serde keySerde, Serde valueSerde) { this.name = name; this.parentDir = parentDir; this.keySerde = keySerde; this.valueSerde = valueSerde; +} +@SuppressWarnings("unchecked") +public void openDB(ProcessorContext context) { // initialize the default rocksdb options -BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); +
kafka git commit: KAFKA-4591; Create Topic Policy follow-up
Repository: kafka Updated Branches: refs/heads/0.10.2 6f72a5a53 -> eb62e5695 KAFKA-4591; Create Topic Policy follow-up 1. Added javadoc to public classes 2. Removed `s` from config name for consistency with interface name 3. The policy interface now implements Configurable and AutoCloseable as per the KIP 4. Use `null` instead of `-1` in `RequestMetadata` 5. Perform all broker validation before invoking the policy 6. Add tests Author: Ismael JumaReviewers: Jason Gustafson Closes #2388 from ijuma/create-topic-policy-docs-and-config-name-change (cherry picked from commit fd6d7bcf335166a524dc9a29a50c96af8f1c1c02) Signed-off-by: Ismael Juma Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eb62e569 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eb62e569 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eb62e569 Branch: refs/heads/0.10.2 Commit: eb62e5695506ae13bd37102c3c08e8a067eca0c8 Parents: 6f72a5a Author: Ismael Juma Authored: Wed Jan 18 02:43:10 2017 + Committer: Ismael Juma Committed: Wed Jan 18 02:43:37 2017 + -- .../common/errors/PolicyViolationException.java | 3 + .../apache/kafka/common/protocol/Errors.java| 2 +- .../kafka/server/policy/CreateTopicPolicy.java | 72 ++-- .../src/main/scala/kafka/admin/AdminUtils.scala | 36 ++ .../main/scala/kafka/server/AdminManager.scala | 56 ++- .../main/scala/kafka/server/KafkaConfig.scala | 6 +- .../AbstractCreateTopicsRequestTest.scala | 2 +- .../CreateTopicsRequestWithPolicyTest.scala | 59 +--- .../unit/kafka/server/KafkaConfigTest.scala | 2 +- 9 files changed, 183 insertions(+), 55 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/eb62e569/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java b/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java index 7923444..393a6df 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java @@ -17,6 +17,9 @@ package org.apache.kafka.common.errors; +/** + * Exception thrown if a create topics request does not satisfy the configured policy for a topic. + */ public class PolicyViolationException extends ApiException { public PolicyViolationException(String message) { http://git-wip-us.apache.org/repos/asf/kafka/blob/eb62e569/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index f30f889..e7689e2 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -166,7 +166,7 @@ public enum Errors { " the message was sent to an incompatible broker. See the broker logs for more details.")), UNSUPPORTED_FOR_MESSAGE_FORMAT(43, new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")), -POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the system policy.")); +POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the configured policy.")); private static final Logger log = LoggerFactory.getLogger(Errors.class); http://git-wip-us.apache.org/repos/asf/kafka/blob/eb62e569/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java -- diff --git a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java index 94f1e76..22a7c1d 100644 --- a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java +++ b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java @@ -13,42 +13,90 @@ package org.apache.kafka.server.policy; +import org.apache.kafka.common.Configurable; import org.apache.kafka.common.errors.PolicyViolationException; import java.util.Collections; import java.util.List; import java.util.Map; -public interface CreateTopicPolicy { +/** + * An interface for enforcing a policy on create topics requests. + * + *
kafka git commit: KAFKA-4591; Create Topic Policy follow-up
Repository: kafka Updated Branches: refs/heads/trunk e3bdc84d8 -> fd6d7bcf3 KAFKA-4591; Create Topic Policy follow-up 1. Added javadoc to public classes 2. Removed `s` from config name for consistency with interface name 3. The policy interface now implements Configurable and AutoCloseable as per the KIP 4. Use `null` instead of `-1` in `RequestMetadata` 5. Perform all broker validation before invoking the policy 6. Add tests Author: Ismael JumaReviewers: Jason Gustafson Closes #2388 from ijuma/create-topic-policy-docs-and-config-name-change Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fd6d7bcf Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fd6d7bcf Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fd6d7bcf Branch: refs/heads/trunk Commit: fd6d7bcf335166a524dc9a29a50c96af8f1c1c02 Parents: e3bdc84 Author: Ismael Juma Authored: Wed Jan 18 02:43:10 2017 + Committer: Ismael Juma Committed: Wed Jan 18 02:43:10 2017 + -- .../common/errors/PolicyViolationException.java | 3 + .../apache/kafka/common/protocol/Errors.java| 2 +- .../kafka/server/policy/CreateTopicPolicy.java | 72 ++-- .../src/main/scala/kafka/admin/AdminUtils.scala | 36 ++ .../main/scala/kafka/server/AdminManager.scala | 56 ++- .../main/scala/kafka/server/KafkaConfig.scala | 6 +- .../AbstractCreateTopicsRequestTest.scala | 2 +- .../CreateTopicsRequestWithPolicyTest.scala | 59 +--- .../unit/kafka/server/KafkaConfigTest.scala | 2 +- 9 files changed, 183 insertions(+), 55 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java b/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java index 7923444..393a6df 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java @@ -17,6 +17,9 @@ package org.apache.kafka.common.errors; +/** + * Exception thrown if a create topics request does not satisfy the configured policy for a topic. + */ public class PolicyViolationException extends ApiException { public PolicyViolationException(String message) { http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index f30f889..e7689e2 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -166,7 +166,7 @@ public enum Errors { " the message was sent to an incompatible broker. See the broker logs for more details.")), UNSUPPORTED_FOR_MESSAGE_FORMAT(43, new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")), -POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the system policy.")); +POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the configured policy.")); private static final Logger log = LoggerFactory.getLogger(Errors.class); http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java -- diff --git a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java index 94f1e76..22a7c1d 100644 --- a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java +++ b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java @@ -13,42 +13,90 @@ package org.apache.kafka.server.policy; +import org.apache.kafka.common.Configurable; import org.apache.kafka.common.errors.PolicyViolationException; import java.util.Collections; import java.util.List; import java.util.Map; -public interface CreateTopicPolicy { +/** + * An interface for enforcing a policy on create topics requests. + * + * Common use cases are requiring that the replication factor, min.insync.replicas and/or retention settings for a + *
kafka git commit: MINOR: remove ZK from system tests
Repository: kafka Updated Branches: refs/heads/trunk 73b7ae001 -> e3bdc84d8 MINOR: remove ZK from system tests Author: Matthias J. SaxReviewers: Guozhang Wang Closes #2391 from mjsax/kafka-4060-zk-follow-up-system-tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e3bdc84d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e3bdc84d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e3bdc84d Branch: refs/heads/trunk Commit: e3bdc84d864a5ca655f985473770f7b7cb24b79b Parents: 73b7ae0 Author: Matthias J. Sax Authored: Tue Jan 17 18:14:43 2017 -0800 Committer: Guozhang Wang Committed: Tue Jan 17 18:14:43 2017 -0800 -- .../kafka/streams/perf/SimpleBenchmark.java | 26 +--- .../streams/smoketest/ShutdownDeadlockTest.java | 7 +- .../streams/smoketest/SmokeTestClient.java | 8 +++--- .../streams/smoketest/SmokeTestDriver.java | 8 +++--- .../streams/smoketest/StreamsSmokeTest.java | 10 +++- 5 files changed, 23 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/e3bdc84d/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java -- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index 7ba6161..fb26206 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -57,7 +57,6 @@ import java.util.Random; public class SimpleBenchmark { private final String kafka; -private final String zookeeper; private final File stateDir; private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic"; @@ -91,18 +90,16 @@ public class SimpleBenchmark { private static final Serde BYTE_SERDE = Serdes.ByteArray(); private static final Serde INTEGER_SERDE = Serdes.Integer(); -public SimpleBenchmark(File stateDir, String kafka, String zookeeper) { +public SimpleBenchmark(File stateDir, String kafka) { super(); this.stateDir = stateDir; this.kafka = kafka; -this.zookeeper = zookeeper; } public static void main(String[] args) throws Exception { String kafka = args.length > 0 ? args[0] : "localhost:9092"; -String zookeeper = args.length > 1 ? args[1] : "localhost:2181"; -String stateDirStr = args.length > 2 ? args[2] : "/tmp/kafka-streams-simple-benchmark"; -numRecords = args.length > 3 ? Integer.parseInt(args[3]) : 1000; +String stateDirStr = args.length > 1 ? args[1] : "/tmp/kafka-streams-simple-benchmark"; +numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 1000; endKey = numRecords - 1; final File stateDir = new File(stateDirStr); @@ -113,11 +110,10 @@ public class SimpleBenchmark { // Note: this output is needed for automated tests and must not be removed System.out.println("SimpleBenchmark instance started"); System.out.println("kafka=" + kafka); -System.out.println("zookeeper=" + zookeeper); System.out.println("stateDir=" + stateDir); System.out.println("numRecords=" + numRecords); -SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper); +SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka); // producer performance benchmark.produce(SOURCE_TOPIC, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true); @@ -239,7 +235,7 @@ public class SimpleBenchmark { public void processStream(String topic) { CountDownLatch latch = new CountDownLatch(1); -final KafkaStreams streams = createKafkaStreams(topic, stateDir, kafka, zookeeper, latch); +final KafkaStreams streams = createKafkaStreams(topic, stateDir, kafka, latch); Thread thread = new Thread() { public void run() { @@ -273,7 +269,7 @@ public class SimpleBenchmark { public void processStreamWithSink(String topic) { CountDownLatch latch = new CountDownLatch(1); -final KafkaStreams streams = createKafkaStreamsWithSink(topic, stateDir, kafka, zookeeper, latch); +final KafkaStreams streams = createKafkaStreamsWithSink(topic, stateDir, kafka, latch); Thread thread = new Thread() { public void run() { @@ -337,7 +333,7 @@ public class SimpleBenchmark { public void processStreamWithStateStore(String topic) { CountDownLatch latch = new
[1/2] kafka git commit: KAFKA-3452 Follow-up: Refactoring StateStore hierarchies
Repository: kafka Updated Branches: refs/heads/0.10.2 c9b9acf6a -> 6f72a5a53 http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java -- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java index 84f1734..abaaffd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.state.WindowStore; @@ -58,23 +59,35 @@ public class RocksDBWindowStoreSupplierextends AbstractStoreSupplier segmentedStore = new RocksDBWindowStore<>(name, retainDuplicates, keySerde, valueSerde, - logged ? new ChangeLoggingSegmentedBytesStore(bytesStore) - : bytesStore); -return new MeteredWindowStore<>(segmentedStore, "rocksdb-window", time); -} +return maybeWrapCaching( +maybeWrapLogged( +new RocksDBSegmentedBytesStore( +name, +retentionPeriod, +numSegments, +new WindowStoreKeySchema() +))); -return new CachingWindowStore<>(new MeteredSegmentedBytesStore(logged ? new ChangeLoggingSegmentedBytesStore(bytesStore) - : bytesStore, - "rocksdb-window", - time), -keySerde, valueSerde, windowSize); } @Override public long retentionPeriod() { return retentionPeriod; } + +private SegmentedBytesStore maybeWrapLogged(final SegmentedBytesStore inner) { +if (!logged) { +return inner; +} +return new ChangeLoggingSegmentedBytesStore(inner); +} + +private WindowStore maybeWrapCaching(final SegmentedBytesStore inner) { +final MeteredSegmentedBytesStore metered = new MeteredSegmentedBytesStore(inner, "rocksdb-window", time); +if (!enableCaching) { +return new RocksDBWindowStore<>(metered, keySerde, valueSerde, retainDuplicates); +} +final RocksDBWindowStore windowed = RocksDBWindowStore.bytesStore(metered, retainDuplicates); +return new CachingWindowStore<>(windowed, keySerde, valueSerde, windowSize); +} } http://git-wip-us.apache.org/repos/asf/kafka/blob/6f72a5a5/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java -- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java new file mode 100644 index 000..d76e8a4 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package
[2/2] kafka git commit: KAFKA-3452 Follow-up: Refactoring StateStore hierarchies
KAFKA-3452 Follow-up: Refactoring StateStore hierarchies This is a follow up of https://github.com/apache/kafka/pull/2166 - refactoring the store hierarchies as requested Author: Damian GuyReviewers: Guozhang Wang Closes #2360 from dguy/state-store-refactor Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73b7ae00 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73b7ae00 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73b7ae00 Branch: refs/heads/trunk Commit: 73b7ae0019d387407375f3865e263225c986a6ce Parents: 825f225 Author: Damian Guy Authored: Tue Jan 17 14:13:46 2017 -0800 Committer: Guozhang Wang Committed: Tue Jan 17 14:13:46 2017 -0800 -- .../kstream/internals/SessionKeySerde.java | 17 ++ .../streams/state/WindowStoreIterator.java | 3 +- .../AbstractMergedSortedCacheStoreIterator.java | 166 +++ .../state/internals/CachingKeyValueStore.java | 10 +- .../state/internals/CachingSessionStore.java| 63 ++ .../state/internals/CachingWindowStore.java | 64 +++--- .../ChangeLoggingKeyValueBytesStore.java| 93 + .../internals/ChangeLoggingKeyValueStore.java | 127 .../ChangeLoggingSegmentedBytesStore.java | 28 +-- .../internals/CompositeReadOnlyWindowStore.java | 5 + .../DelegatingPeekingKeyValueIterator.java | 10 +- .../MergedSortedCacheKeyValueStoreIterator.java | 130 ++-- .../MergedSortedCacheSessionStoreIterator.java | 71 +++ .../MergedSortedCacheWindowStoreIterator.java | 58 ++ .../MergedSortedCachedWindowStoreIterator.java | 107 -- .../state/internals/MeteredKeyValueStore.java | 27 +-- .../internals/MeteredSegmentedBytesStore.java | 27 +-- .../state/internals/MeteredWindowStore.java | 180 .../internals/RocksDBKeyValueStoreSupplier.java | 54 +++-- .../internals/RocksDBSessionStoreSupplier.java | 54 +++-- .../streams/state/internals/RocksDBStore.java | 28 +-- .../state/internals/RocksDBWindowStore.java | 25 ++- .../internals/RocksDBWindowStoreSupplier.java | 37 ++-- .../internals/SerializedKeyValueIterator.java | 70 +++ .../state/internals/WindowStoreUtils.java | 3 + .../state/internals/WrappedStateStore.java | 90 .../internals/KGroupedStreamImplTest.java | 42 +++- .../internals/CachingSessionStoreTest.java | 3 +- .../state/internals/CachingWindowStoreTest.java | 5 +- .../ChangeLoggingKeyValueBytesStoreTest.java| 165 +++ .../ChangeLoggingKeyValueStoreTest.java | 207 +++ .../DelegatingPeekingKeyValueIteratorTest.java | 12 +- ...rgedSortedCacheSessionStoreIteratorTest.java | 113 ++ ...ergedSortedCacheWindowStoreIteratorTest.java | 35 +++- .../internals/ReadOnlyWindowStoreStub.java | 5 + .../RocksDBKeyValueStoreSupplierTest.java | 155 ++ .../RocksDBSessionStoreSupplierTest.java| 169 +++ .../RocksDBWindowStoreSupplierTest.java | 168 +++ .../state/internals/RocksDBWindowStoreTest.java | 2 - .../SerializedKeyValueIteratorTest.java | 95 + 40 files changed, 2081 insertions(+), 642 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/73b7ae00/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java -- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java index 48213d6..d9a3528 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import java.nio.ByteBuffer; @@ -146,4 +147,20 @@ public class SessionKeySerde implements Serde { buf.putLong(sessionKey.window().start()); return new Bytes(buf.array()); } + +public static Bytes bytesToBinary(final Windowed sessionKey) { +final byte[] bytes = sessionKey.key().get(); +ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE); +buf.put(bytes); +buf.putLong(sessionKey.window().end()); +
kafka git commit: KAFKA-4588: Wait for topics to be created in QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable
Repository: kafka Updated Branches: refs/heads/trunk 3f6c4f63c -> 825f225bc KAFKA-4588: Wait for topics to be created in QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable After debugging this i can see the times that it fails there is a race between when the topic is actually created/ready on the broker and when the assignment happens. When it fails `StreamPartitionAssignor.assign(..)` gets called with a `Cluster` with no topics. Hence the test hangs as no tasks get assigned. To fix this I added a `waitForTopics` method to `EmbeddedKafkaCluster`. This will wait until the topics have been created. Author: Damian GuyReviewers: Matthias J. Sax, Guozhang Wang Closes #2371 from dguy/integration-test-fix Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/825f225b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/825f225b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/825f225b Branch: refs/heads/trunk Commit: 825f225bc5706b16af8ec44ca47ee1452c11e6f3 Parents: 3f6c4f6 Author: Damian Guy Authored: Tue Jan 17 12:33:11 2017 -0800 Committer: Guozhang Wang Committed: Tue Jan 17 12:33:11 2017 -0800 -- checkstyle/import-control.xml | 1 + .../GlobalKTableIntegrationTest.java| 4 +- .../KStreamAggregationDedupIntegrationTest.java | 4 +- .../KStreamAggregationIntegrationTest.java | 4 +- .../KStreamKTableJoinIntegrationTest.java | 2 +- .../integration/KStreamRepartitionJoinTest.java | 8 ++-- .../QueryableStateIntegrationTest.java | 14 +++ .../integration/utils/EmbeddedKafkaCluster.java | 24 +-- .../integration/utils/IntegrationTestUtils.java | 43 .../integration/utils/KafkaEmbedded.java| 3 ++ .../org/apache/kafka/test/StreamsTestUtils.java | 1 + 11 files changed, 87 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/checkstyle/import-control.xml -- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index b68cf98..04f364c 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -156,6 +156,7 @@ + http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java -- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 85b851d..6ac87ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -81,7 +81,7 @@ public class GlobalKTableIntegrationTest { private ForeachAction foreachAction; @Before -public void before() { +public void before() throws InterruptedException { testNo++; builder = new KStreamBuilder(); createTopics(); @@ -212,7 +212,7 @@ public class GlobalKTableIntegrationTest { } -private void createTopics() { +private void createTopics() throws InterruptedException { inputStream = "input-stream-" + testNo; inputTable = "input-table-" + testNo; globalOne = "globalOne-" + testNo; http://git-wip-us.apache.org/repos/asf/kafka/blob/825f225b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java -- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 9397e03..f2a767c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -72,7 +72,7 @@ public class KStreamAggregationDedupIntegrationTest { @Before -public void before() { +public void before() throws InterruptedException { testNo++; builder = new KStreamBuilder(); createTopics(); @@ -267,7 +267,7 @@ public class KStreamAggregationDedupIntegrationTest { } -private void createTopics() { +private void createTopics() throws InterruptedException {
kafka git commit: KAFKA-4588: Wait for topics to be created in QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable
Repository: kafka Updated Branches: refs/heads/0.10.2 60d759a22 -> c9b9acf6a KAFKA-4588: Wait for topics to be created in QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable After debugging this i can see the times that it fails there is a race between when the topic is actually created/ready on the broker and when the assignment happens. When it fails `StreamPartitionAssignor.assign(..)` gets called with a `Cluster` with no topics. Hence the test hangs as no tasks get assigned. To fix this I added a `waitForTopics` method to `EmbeddedKafkaCluster`. This will wait until the topics have been created. Author: Damian GuyReviewers: Matthias J. Sax, Guozhang Wang Closes #2371 from dguy/integration-test-fix (cherry picked from commit 825f225bc5706b16af8ec44ca47ee1452c11e6f3) Signed-off-by: Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9b9acf6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9b9acf6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9b9acf6 Branch: refs/heads/0.10.2 Commit: c9b9acf6a8b542c2d0d825c17a4a20cf3fa5 Parents: 60d759a Author: Damian Guy Authored: Tue Jan 17 12:33:11 2017 -0800 Committer: Guozhang Wang Committed: Tue Jan 17 12:33:20 2017 -0800 -- checkstyle/import-control.xml | 1 + .../GlobalKTableIntegrationTest.java| 4 +- .../KStreamAggregationDedupIntegrationTest.java | 4 +- .../KStreamAggregationIntegrationTest.java | 4 +- .../KStreamKTableJoinIntegrationTest.java | 2 +- .../integration/KStreamRepartitionJoinTest.java | 8 ++-- .../QueryableStateIntegrationTest.java | 14 +++ .../integration/utils/EmbeddedKafkaCluster.java | 24 +-- .../integration/utils/IntegrationTestUtils.java | 43 .../integration/utils/KafkaEmbedded.java| 3 ++ .../org/apache/kafka/test/StreamsTestUtils.java | 1 + 11 files changed, 87 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/c9b9acf6/checkstyle/import-control.xml -- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index b68cf98..04f364c 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -156,6 +156,7 @@ + http://git-wip-us.apache.org/repos/asf/kafka/blob/c9b9acf6/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java -- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 85b851d..6ac87ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -81,7 +81,7 @@ public class GlobalKTableIntegrationTest { private ForeachAction foreachAction; @Before -public void before() { +public void before() throws InterruptedException { testNo++; builder = new KStreamBuilder(); createTopics(); @@ -212,7 +212,7 @@ public class GlobalKTableIntegrationTest { } -private void createTopics() { +private void createTopics() throws InterruptedException { inputStream = "input-stream-" + testNo; inputTable = "input-table-" + testNo; globalOne = "globalOne-" + testNo; http://git-wip-us.apache.org/repos/asf/kafka/blob/c9b9acf6/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java -- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 9397e03..f2a767c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -72,7 +72,7 @@ public class KStreamAggregationDedupIntegrationTest { @Before -public void before() { +public void before() throws InterruptedException { testNo++; builder = new KStreamBuilder(); createTopics(); @@ -267,7 +267,7 @@ public class KStreamAggregationDedupIntegrationTest {
kafka git commit: KAFKA-4580; Use sasl.jaas.config for some system tests
Repository: kafka Updated Branches: refs/heads/0.10.2 e3f4cdd0e -> 2b19ad9d8 KAFKA-4580; Use sasl.jaas.config for some system tests Switched console_consumer, verifiable_consumer and verifiable_producer to use new sasl.jaas_config property instead of static JAAS configuration file when used with SASL_PLAINTEXT. Author: Rajini SivaramReviewers: Ewen Cheslack-Postava , Ismael Juma Closes #2323 from rajinisivaram/KAFKA-4580 (cherry picked from commit 3f6c4f63c9c17424cf717ca76c74554bcf3b2e9a) Signed-off-by: Ismael Juma Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2b19ad9d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2b19ad9d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2b19ad9d Branch: refs/heads/0.10.2 Commit: 2b19ad9d8c47fb0f78a6e90d2f5711df6110bf1f Parents: e3f4cdd Author: Rajini Sivaram Authored: Tue Jan 17 18:42:55 2017 + Committer: Ismael Juma Committed: Tue Jan 17 18:43:25 2017 + -- tests/kafkatest/services/console_consumer.py| 4 +-- .../services/security/security_config.py| 28 +++- .../services/security/templates/jaas.conf | 4 +++ tests/kafkatest/services/verifiable_consumer.py | 5 ++-- tests/kafkatest/services/verifiable_producer.py | 9 +++ 5 files changed, 35 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/2b19ad9d/tests/kafkatest/services/console_consumer.py -- diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 17ddb6b..cdc46cd 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -150,7 +150,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) # Add security properties to the config. If security protocol is not specified, # use the default in the template properties. -self.security_config = self.kafka.security_config.client_config(prop_file) +self.security_config = self.kafka.security_config.client_config(prop_file, node) +self.security_config.setup_node(node) prop_file += str(self.security_config) return prop_file @@ -231,7 +232,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) prop_file = self.prop_file(node) self.logger.info(prop_file) node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file) -self.security_config.setup_node(node) # Create and upload log properties log_config = self.render('tools_log4j.properties', log_file=ConsoleConsumer.LOG_FILE) http://git-wip-us.apache.org/repos/asf/kafka/blob/2b19ad9d/tests/kafkatest/services/security/security_config.py -- diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index 864e0a3..846d9b1 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -112,7 +112,7 @@ class SecurityConfig(TemplateRenderer): def __init__(self, context, security_protocol=None, interbroker_security_protocol=None, client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI, - zk_sasl=False, template_props=""): + zk_sasl=False, template_props="", static_jaas_conf=True): """ Initialize the security properties for the node and copy keystore and truststore to the remote node if the transport protocol @@ -143,6 +143,7 @@ class SecurityConfig(TemplateRenderer): self.has_sasl = self.is_sasl(security_protocol) or self.is_sasl(interbroker_security_protocol) or zk_sasl self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol) self.zk_sasl = zk_sasl +self.static_jaas_conf = static_jaas_conf self.properties = { 'security.protocol' : security_protocol, 'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH, @@ -156,8 +157,14 @@ class SecurityConfig(TemplateRenderer): 'sasl.kerberos.service.name' : 'kafka' } -def client_config(self, template_props=""): -return SecurityConfig(self.context, self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props) +def client_config(self, template_props="", node=None): +# If node
kafka git commit: KAFKA-4580; Use sasl.jaas.config for some system tests
Repository: kafka Updated Branches: refs/heads/trunk 7a84b241e -> 3f6c4f63c KAFKA-4580; Use sasl.jaas.config for some system tests Switched console_consumer, verifiable_consumer and verifiable_producer to use new sasl.jaas_config property instead of static JAAS configuration file when used with SASL_PLAINTEXT. Author: Rajini SivaramReviewers: Ewen Cheslack-Postava , Ismael Juma Closes #2323 from rajinisivaram/KAFKA-4580 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f6c4f63 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f6c4f63 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f6c4f63 Branch: refs/heads/trunk Commit: 3f6c4f63c9c17424cf717ca76c74554bcf3b2e9a Parents: 7a84b24 Author: Rajini Sivaram Authored: Tue Jan 17 18:42:55 2017 + Committer: Ismael Juma Committed: Tue Jan 17 18:42:55 2017 + -- tests/kafkatest/services/console_consumer.py| 4 +-- .../services/security/security_config.py| 28 +++- .../services/security/templates/jaas.conf | 4 +++ tests/kafkatest/services/verifiable_consumer.py | 5 ++-- tests/kafkatest/services/verifiable_producer.py | 9 +++ 5 files changed, 35 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/3f6c4f63/tests/kafkatest/services/console_consumer.py -- diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 17ddb6b..cdc46cd 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -150,7 +150,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) # Add security properties to the config. If security protocol is not specified, # use the default in the template properties. -self.security_config = self.kafka.security_config.client_config(prop_file) +self.security_config = self.kafka.security_config.client_config(prop_file, node) +self.security_config.setup_node(node) prop_file += str(self.security_config) return prop_file @@ -231,7 +232,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) prop_file = self.prop_file(node) self.logger.info(prop_file) node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file) -self.security_config.setup_node(node) # Create and upload log properties log_config = self.render('tools_log4j.properties', log_file=ConsoleConsumer.LOG_FILE) http://git-wip-us.apache.org/repos/asf/kafka/blob/3f6c4f63/tests/kafkatest/services/security/security_config.py -- diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index 864e0a3..846d9b1 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -112,7 +112,7 @@ class SecurityConfig(TemplateRenderer): def __init__(self, context, security_protocol=None, interbroker_security_protocol=None, client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI, - zk_sasl=False, template_props=""): + zk_sasl=False, template_props="", static_jaas_conf=True): """ Initialize the security properties for the node and copy keystore and truststore to the remote node if the transport protocol @@ -143,6 +143,7 @@ class SecurityConfig(TemplateRenderer): self.has_sasl = self.is_sasl(security_protocol) or self.is_sasl(interbroker_security_protocol) or zk_sasl self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol) self.zk_sasl = zk_sasl +self.static_jaas_conf = static_jaas_conf self.properties = { 'security.protocol' : security_protocol, 'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH, @@ -156,8 +157,14 @@ class SecurityConfig(TemplateRenderer): 'sasl.kerberos.service.name' : 'kafka' } -def client_config(self, template_props=""): -return SecurityConfig(self.context, self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props) +def client_config(self, template_props="", node=None): +# If node is not specified, use static jaas config which will be created later. +# Otherwise use static JAAS
kafka git commit: MINOR: Some cleanups and additional testing for KIP-88
Repository: kafka Updated Branches: refs/heads/trunk 55abe65e0 -> 7a84b241e MINOR: Some cleanups and additional testing for KIP-88 Author: Jason GustafsonReviewers: Vahid Hashemian , Ismael Juma Closes #2383 from hachikuji/minor-cleanup-kip-88 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7a84b241 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7a84b241 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7a84b241 Branch: refs/heads/trunk Commit: 7a84b241eeb8cb63400a9512b066c3f733f94b8c Parents: 55abe65 Author: Jason Gustafson Authored: Tue Jan 17 10:42:05 2017 -0800 Committer: Jason Gustafson Committed: Tue Jan 17 10:42:05 2017 -0800 -- .../common/requests/OffsetFetchRequest.java | 24 +- .../common/requests/OffsetFetchResponse.java| 83 ++- .../clients/consumer/KafkaConsumerTest.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 4 +- .../common/requests/RequestResponseTest.java| 23 -- .../scala/kafka/api/OffsetFetchRequest.scala| 1 - .../kafka/coordinator/GroupCoordinator.scala| 4 +- .../coordinator/GroupMetadataManager.scala | 31 --- .../src/main/scala/kafka/server/KafkaApis.scala | 86 .../kafka/api/AuthorizerIntegrationTest.scala | 37 - .../GroupCoordinatorResponseTest.scala | 58 - 11 files changed, 211 insertions(+), 142 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index 0ff49be..553fd96 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -78,7 +79,7 @@ public class OffsetFetchRequest extends AbstractRequest { private final List partitions; public static OffsetFetchRequest forAllPartitions(String groupId) { -return new OffsetFetchRequest.Builder(groupId, (List) null).setVersion((short) 2).build(); +return new OffsetFetchRequest.Builder(groupId, null).setVersion((short) 2).build(); } // v0, v1, and v2 have the same fields. @@ -131,20 +132,35 @@ public class OffsetFetchRequest extends AbstractRequest { groupId = struct.getString(GROUP_ID_KEY_NAME); } -@Override -public AbstractResponse getErrorResponse(Throwable e) { +public OffsetFetchResponse getErrorResponse(Errors error) { short versionId = version(); + +Map responsePartitions = new HashMap<>(); +if (versionId < 2) { +for (TopicPartition partition : this.partitions) { +responsePartitions.put(partition, new OffsetFetchResponse.PartitionData( +OffsetFetchResponse.INVALID_OFFSET, +OffsetFetchResponse.NO_METADATA, +error)); +} +} + switch (versionId) { case 0: case 1: case 2: -return new OffsetFetchResponse(Errors.forException(e), partitions, versionId); +return new OffsetFetchResponse(error, responsePartitions, versionId); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id))); } } +@Override +public OffsetFetchResponse getErrorResponse(Throwable e) { +return getErrorResponse(Errors.forException(e)); +} + public String groupId() { return groupId; } http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index 0095f38..9c14155 100644 ---
[1/2] kafka git commit: KAFKA-4363; Documentation for sasl.jaas.config property
Repository: kafka Updated Branches: refs/heads/0.10.2 7946b2f35 -> e3f4cdd0e KAFKA-4363; Documentation for sasl.jaas.config property Author: Rajini SivaramReviewers: Ismael Juma Closes #2316 from rajinisivaram/KAFKA-4363 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/621dff22 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/621dff22 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/621dff22 Branch: refs/heads/0.10.2 Commit: 621dff22e79dc64b9a8748186dd985774044f91a Parents: 7946b2f Author: Rajini Sivaram Authored: Tue Jan 17 11:16:29 2017 + Committer: Ismael Juma Committed: Tue Jan 17 12:56:37 2017 + -- docs/security.html | 147 1 file changed, 99 insertions(+), 48 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/621dff22/docs/security.html -- diff --git a/docs/security.html b/docs/security.html index 350f0cf..8cb867e 100644 --- a/docs/security.html +++ b/docs/security.html @@ -263,23 +263,65 @@ SASL configuration for Kafka clients SASL authentication is only supported for the new Java Kafka producer and -consumer, the older API is not supported. To configure SASL authentication -on the clients: +consumer, the older API is not supported. JAAS configuration for clients may +be specified as a static JAAS config file or using the client configuration property +sasl.jaas.config. +To configure SASL authentication on the clients: Select a SASL mechanism for authentication. -Add a JAAS config file for the selected mechanism as described in the examples -for setting up GSSAPI (Kerberos) -or PLAIN. KafkaClient is the -section name in the JAAS file used by Kafka clients. -Pass the JAAS config file location as JVM parameter to each client JVM. For example: - -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf Configure the following properties in producer.properties or consumer.properties: security.protocol=SASL_PLAINTEXT (or SASL_SSL) -sasl.mechanism=GSSAPI (or PLAIN) +sasl.mechanism=GSSAPI (or PLAIN) Follow the steps in GSSAPI (Kerberos) or PLAIN to configure SASL for the selected mechanism. +Configure JAAS using client configuration property +or static JAAS config file as described below. + + +JAAS configuration using client configuration property +Clients may specify JAAS configuration as a producer or consumer property without +creating a physical configuration file. This mode also enables different producers +and consumers within the same JVM to use different credentials by specifying +different properties for each client. If both static JAAS configuration system property +java.security.auth.login.config and client property sasl.jaas.config +are specified, the client property will be used. + +To configure SASL authentication on the clients using configuration property: + +Configure the property sasl.jaas.config in producer.properties or +consumer.properties to be the JAAS login module section of the selected mechanism. +For example, PLAIN +credentials may be configured as: + sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret"; +See GSSAPI (Kerberos) or PLAIN +for full example configurations. + + +JAAS configuration using static config file +To configure SASL authentication on the clients using static JAAS config file: + +Add a JAAS config file with a client login section named KafkaClient. Configure +a login module in KafkaClient for the selected mechanism as described in the examples +for setting up GSSAPI (Kerberos) +or PLAIN. +For example, GSSAPI +credentials may be configured as: + +KafkaClient { +com.sun.security.auth.module.Krb5LoginModule required +useKeyTab=true +storeKey=true +keyTab="/etc/security/keytabs/kafka_client.keytab" +principal="kafka-clien...@example.com"; +}; +See GSSAPI (Kerberos) or PLAIN +for
kafka git commit: KAFKA-4590; SASL/SCRAM system tests
Repository: kafka Updated Branches: refs/heads/trunk b4d8668d6 -> 55abe65e0 KAFKA-4590; SASL/SCRAM system tests Runs sanity test and one replication test using SASL/SCRAM. Author: Rajini SivaramReviewers: Ewen Cheslack-Postava , Ismael Juma Closes #2355 from rajinisivaram/KAFKA-4590 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/55abe65e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/55abe65e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/55abe65e Branch: refs/heads/trunk Commit: 55abe65e0996008d54bd5bb8440906ac4a359937 Parents: b4d8668 Author: Rajini Sivaram Authored: Tue Jan 17 12:55:07 2017 + Committer: Ismael Juma Committed: Tue Jan 17 12:55:07 2017 + -- .../sanity_checks/test_console_consumer.py | 2 +- tests/kafkatest/services/kafka/kafka.py | 6 + .../services/security/security_config.py| 25 .../services/security/templates/jaas.conf | 9 +++ tests/kafkatest/tests/core/replication_test.py | 6 - 5 files changed, 46 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/55abe65e/tests/kafkatest/sanity_checks/test_console_consumer.py -- diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index 38db057..066d6d4 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -47,7 +47,7 @@ class ConsoleConsumerTest(Test): @parametrize(security_protocol='PLAINTEXT', new_consumer=False) @matrix(security_protocol=['PLAINTEXT', 'SSL']) @cluster(num_nodes=4) -@parametrize(security_protocol='SASL_SSL', sasl_mechanism='PLAIN') +@matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512']) @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL']) def test_lifecycle(self, security_protocol, new_consumer=True, sasl_mechanism='GSSAPI'): """Check that console consumer starts/stops properly, and that we are capturing log output.""" http://git-wip-us.apache.org/repos/asf/kafka/blob/55abe65e/tests/kafkatest/services/kafka/kafka.py -- diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 716c2d2..8ef0f35 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -208,6 +208,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): node.account.create_file(self.LOG4J_CONFIG, self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR)) self.security_config.setup_node(node) +self.security_config.setup_credentials(node, self.path, self.zk.connect_setting(), broker=True) cmd = self.start_cmd(node) self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd)) @@ -215,6 +216,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): node.account.ssh(cmd) monitor.wait_until("Kafka Server.*started", timeout_sec=30, backoff_sec=.25, err_msg="Kafka server didn't finish startup") +# Credentials for inter-broker communication are created before starting Kafka. +# Client credentials are created after starting Kafka so that both loading of +# existing credentials from ZK and dynamic update of credentials in Kafka are tested. +self.security_config.setup_credentials(node, self.path, self.zk.connect_setting(), broker=False) + self.start_jmx_tool(self.idx(node), node) if len(self.pids(node)) == 0: raise Exception("No process ids recorded on node %s" % str(node)) http://git-wip-us.apache.org/repos/asf/kafka/blob/55abe65e/tests/kafkatest/services/security/security_config.py -- diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index 9b29217..864e0a3 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -94,6 +94,12 @@ class SecurityConfig(TemplateRenderer): SASL_SSL = 'SASL_SSL' SASL_MECHANISM_GSSAPI = 'GSSAPI' SASL_MECHANISM_PLAIN = 'PLAIN' +SASL_MECHANISM_SCRAM_SHA_256 = 'SCRAM-SHA-256' +SASL_MECHANISM_SCRAM_SHA_512 = 'SCRAM-SHA-512' +SCRAM_CLIENT_USER = "kafka-client" +
kafka git commit: KAFKA-4363; Documentation for sasl.jaas.config property
Repository: kafka Updated Branches: refs/heads/trunk b62804a25 -> b4d8668d6 KAFKA-4363; Documentation for sasl.jaas.config property Author: Rajini SivaramReviewers: Ismael Juma Closes #2316 from rajinisivaram/KAFKA-4363 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b4d8668d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b4d8668d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b4d8668d Branch: refs/heads/trunk Commit: b4d8668d6d47fca8cc6de4f550d703cd8926c436 Parents: b62804a Author: Rajini Sivaram Authored: Tue Jan 17 11:16:29 2017 + Committer: Ismael Juma Committed: Tue Jan 17 11:16:29 2017 + -- docs/security.html | 147 1 file changed, 99 insertions(+), 48 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/b4d8668d/docs/security.html -- diff --git a/docs/security.html b/docs/security.html index 350f0cf..8cb867e 100644 --- a/docs/security.html +++ b/docs/security.html @@ -263,23 +263,65 @@ SASL configuration for Kafka clients SASL authentication is only supported for the new Java Kafka producer and -consumer, the older API is not supported. To configure SASL authentication -on the clients: +consumer, the older API is not supported. JAAS configuration for clients may +be specified as a static JAAS config file or using the client configuration property +sasl.jaas.config. +To configure SASL authentication on the clients: Select a SASL mechanism for authentication. -Add a JAAS config file for the selected mechanism as described in the examples -for setting up GSSAPI (Kerberos) -or PLAIN. KafkaClient is the -section name in the JAAS file used by Kafka clients. -Pass the JAAS config file location as JVM parameter to each client JVM. For example: - -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf Configure the following properties in producer.properties or consumer.properties: security.protocol=SASL_PLAINTEXT (or SASL_SSL) -sasl.mechanism=GSSAPI (or PLAIN) +sasl.mechanism=GSSAPI (or PLAIN) Follow the steps in GSSAPI (Kerberos) or PLAIN to configure SASL for the selected mechanism. +Configure JAAS using client configuration property +or static JAAS config file as described below. + + +JAAS configuration using client configuration property +Clients may specify JAAS configuration as a producer or consumer property without +creating a physical configuration file. This mode also enables different producers +and consumers within the same JVM to use different credentials by specifying +different properties for each client. If both static JAAS configuration system property +java.security.auth.login.config and client property sasl.jaas.config +are specified, the client property will be used. + +To configure SASL authentication on the clients using configuration property: + +Configure the property sasl.jaas.config in producer.properties or +consumer.properties to be the JAAS login module section of the selected mechanism. +For example, PLAIN +credentials may be configured as: + sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret"; +See GSSAPI (Kerberos) or PLAIN +for full example configurations. + + +JAAS configuration using static config file +To configure SASL authentication on the clients using static JAAS config file: + +Add a JAAS config file with a client login section named KafkaClient. Configure +a login module in KafkaClient for the selected mechanism as described in the examples +for setting up GSSAPI (Kerberos) +or PLAIN. +For example, GSSAPI +credentials may be configured as: + +KafkaClient { +com.sun.security.auth.module.Krb5LoginModule required +useKeyTab=true +storeKey=true +keyTab="/etc/security/keytabs/kafka_client.keytab" +principal="kafka-clien...@example.com"; +}; +See GSSAPI (Kerberos) or PLAIN +for