This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 0.10.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.10.2 by this push:
new 67b0a94 KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0
(#4758)
67b0a94 is described below
commit 67b0a94fa2856928809435ad8f442dd3c96ba544
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Mar 26 16:31:47 2018 -0700
KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4758)
Introduces new config parameter `upgrade.from`.
Reviewers: Guozhang Wang <[email protected]>, Bill Bejeck
<[email protected]>
---
bin/kafka-run-class.sh | 40 +++-
build.gradle | 24 ++
.../authenticator/SaslClientCallbackHandler.java | 9 +-
docs/streams.html | 39 +++-
docs/upgrade.html | 22 ++
gradle/dependencies.gradle | 4 +
settings.gradle | 3 +-
.../org/apache/kafka/streams/StreamsConfig.java | 19 +-
.../internals/StreamPartitionAssignor.java | 18 +-
.../internals/assignment/AssignmentInfo.java | 7 +-
.../internals/assignment/SubscriptionInfo.java | 5 +-
.../org/apache/kafka/streams/KafkaStreamsTest.java | 36 +--
.../apache/kafka/streams/StreamsConfigTest.java | 42 ++--
.../streams/integration/FanoutIntegrationTest.java | 2 +
.../KStreamAggregationDedupIntegrationTest.java | 11 +-
.../KStreamAggregationIntegrationTest.java | 12 +-
.../integration/QueryableStateIntegrationTest.java | 7 +-
.../internals/StreamPartitionAssignorTest.java | 125 ++++++++---
.../internals/assignment/AssignmentInfoTest.java | 3 +-
.../kafka/streams/tests/SmokeTestClient.java | 10 +-
.../kafka/streams/tests/SmokeTestDriver.java | 38 ++--
.../apache/kafka/streams/tests/SmokeTestUtil.java | 11 +-
.../kafka/streams/tests/StreamsSmokeTest.java | 14 +-
.../kafka/streams/tests/StreamsUpgradeTest.java | 73 +++++++
.../kafka/streams/tests/StreamsUpgradeTest.java | 104 +++++++++
.../kafka/streams/tests/StreamsUpgradeTest.java | 114 ++++++++++
tests/kafkatest/services/streams.py | 173 ++++++++++++++-
.../tests/streams/streams_upgrade_test.py | 242 +++++++++++++++++++++
tests/kafkatest/version.py | 1 +
vagrant/base.sh | 2 +
30 files changed, 1060 insertions(+), 150 deletions(-)
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index af10f61..a258681 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -73,28 +73,50 @@ do
fi
done
-for file in "$base_dir"/clients/build/libs/kafka-clients*.jar;
-do
- if should_include_file "$file"; then
- CLASSPATH="$CLASSPATH":"$file"
- fi
-done
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+ clients_lib_dir=$(dirname $0)/../clients/build/libs
+ streams_lib_dir=$(dirname $0)/../streams/build/libs
+ rocksdb_lib_dir=$(dirname
$0)/../streams/build/dependant-libs-${SCALA_VERSION}
+else
+ clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
+ streams_lib_dir=$clients_lib_dir
+ rocksdb_lib_dir=$streams_lib_dir
+fi
+
-for file in "$base_dir"/streams/build/libs/kafka-streams*.jar;
+for file in "$clients_lib_dir"/kafka-clients*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
-for file in
"$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+for file in "$streams_lib_dir"/kafka-streams*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
-for file in
"$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+ for file in
"$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+ do
+ if should_include_file "$file"; then
+ CLASSPATH="$CLASSPATH":"$file"
+ fi
+ done
+else
+ VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`
+ SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} #
remove last char, ie, bug-fix number
+ for file in
"$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
+ do
+ if should_include_file "$file"; then
+ CLASSPATH="$CLASSPATH":"$file"
+ fi
+ done
+fi
+
+for file in "$rocksdb_lib_dir"/rocksdb*.jar;
do
CLASSPATH="$CLASSPATH":"$file"
done
diff --git a/build.gradle b/build.gradle
index 20a184c..5e97f90 100644
--- a/build.gradle
+++ b/build.gradle
@@ -770,6 +770,30 @@ project(':streams:examples') {
}
}
+project(':streams:upgrade-system-tests-0100') {
+ archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
+
+ dependencies {
+ testCompile libs.kafkaStreams_0100
+ }
+
+ systemTestLibs {
+ dependsOn testJar
+ }
+}
+
+project(':streams:upgrade-system-tests-0101') {
+ archivesBaseName = "kafka-streams-upgrade-system-tests-0101"
+
+ dependencies {
+ testCompile libs.kafkaStreams_0101
+ }
+
+ systemTestLibs {
+ dependsOn testJar
+ }
+}
+
project(':log4j-appender') {
archivesBaseName = "kafka-log4j-appender"
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index 6094b54..b80dfcc 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -17,7 +17,9 @@
*/
package org.apache.kafka.common.security.authenticator;
-import java.util.Map;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
@@ -26,10 +28,7 @@ import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;
-
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+import java.util.Map;
/**
* Callback handler for Sasl clients. The callbacks required for the SASL
mechanism
diff --git a/docs/streams.html b/docs/streams.html
index fe0e84e..d691e63 100644
--- a/docs/streams.html
+++ b/docs/streams.html
@@ -808,20 +808,49 @@ $ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
</p>
<p>
+ Upgrading from 0.10.0.x to 0.10.2.x directly is also possible.
+ See <a href="#streams_api_changes_0102">Streams API changes in
0.10.2</a> and <a href="#streams_api_changes_0101">Streams API changes in
0.10.1</a>
+ for a complete list of API changes.
+ Upgrading to 0.10.2.2 requires two rolling bounces with config
<code>upgrade.from="0.10.0"</code> set for first upgrade phase
+ (cf. <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+ As an alternative, and offline upgrade is also possible.
+ </p>
+ <ul>
+ <li> prepare your application instances for a rolling bounce and
make sure that config <code>upgrade.from=</code> is set to
<code>"0.10.0"</code> for new version 0.10.2.2 </li>
+ <li> bounce each instance of your application once </li>
+ <li> prepare your newly deployed 0.10.2.2 application instances
for a second round of rolling bounces; make sure to remove the value for config
<code>upgrade.mode</code> </li>
+ <li> bounce each instance of your application once more to
complete the upgrade </li>
+ </ul>
+ <p> Upgrading from 0.10.0.x to 0.10.2.0 or 0.10.2.1 requires an
offline upgrade (rolling bounce upgrade is not supported) </p>
+ <ul>
+ <li> stop all old (0.10.0.x) application instances </li>
+ <li> update your code and swap old code and jar file with new code
and new jar file </li>
+ <li> restart all new (0.10.2.0 or 0.10.2.1) application instances
</li>
+ </ul>
+
+ <p>
If you want to upgrade from 0.10.0.x to 0.10.1, see the <a
href="/{{version}}/documentation/#upgrade_1010_streams">Upgrade Section for
0.10.1</a>.
It highlights incompatible changes you need to consider to upgrade
your code and application.
See <a href="#streams_api_changes_0101">below</a> a complete list of
0.10.1 API changes that allow you to advance your application and/or simplify
your code base, including the usage of new features.
</p>
- <h3><a id="streams_api_changes_01021"
href="#streams_api_changes_0102">Notable changes in 0.10.2.1</a></h3>
- <p>
+ <h3><a id="streams_api_changes_01022"
href="#streams_api_changes_0102">Notable changes in 0.10.2.2</a></h3>
+ <p>
+ Parameter updates in <code>StreamsConfig</code>:
+ </p>
+ <ul>
+ <li> New configuration parameter <code>upgrade.from</code> added
that allows rolling bounce upgrade from version 0.10.0.x </li>
+ </ul>
+
+ <h3><a id="streams_api_changes_01021"
href="#streams_api_changes_0102">Notable changes in 0.10.2.1</a></h3>
+ <p>
Parameter updates in <code>StreamsConfig</code>:
</p>
- <ul>
+ <ul>
<li> of particular importance to improve the resiliency of a Kafka
Streams application are two changes to default parameters of producer
<code>retries</code> and consumer <code>max.poll.interval.ms</code> </li>
- </ul>
- <h3><a id="streams_api_changes_0102"
href="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3>
+ </ul>
+ <h3><a id="streams_api_changes_0102"
href="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3>
<p>
New methods in <code>KafkaStreams</code>:
</p>
diff --git a/docs/upgrade.html b/docs/upgrade.html
index d7581fa..7747762 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -61,6 +61,11 @@ Kafka cluster before upgrading your clients. Version 0.10.2
brokers support 0.8.
<li> See <a
href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API
changes in 0.10.2</a> for more details. </li>
</ul>
+<h5><a id="upgrade_10202_notable" href="#upgrade_10202_notable">Notable
changes in 0.10.2.2</a></h5>
+<ul>
+ <li> New configuration parameter <code>upgrade.from</code> added that
allows rolling bounce upgrade from version 0.10.0.x </li>
+</ul>
+
<h5><a id="upgrade_10201_notable" href="#upgrade_10201_notable">Notable
changes in 0.10.2.1</a></h5>
<ul>
<li> The default values for two configurations of the StreamsConfig class
were changed to improve the resiliency of Kafka Streams applications. The
internal Kafka Streams producer <code>retries</code> default value was changed
from 0 to 10. The internal Kafka Streams consumer
<code>max.poll.interval.ms</code> default value was changed from 300000 to
<code>Integer.MAX_VALUE</code>.
@@ -141,6 +146,23 @@ only support 0.10.1.x or later brokers while 0.10.1.x
brokers also support older
<li> Upgrading your Streams application from 0.10.0 to 0.10.1 does require
a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.1
application can only connect to 0.10.1 brokers. </li>
<li> There are couple of API changes, that are not backward compatible
(cf. <a
href="/{{version}}/documentation/streams#streams_api_changes_0101">Streams API
changes in 0.10.1</a> for more details).
Thus, you need to update and recompile your code. Just swapping the
Kafka Streams library jar file will not work and will break your application.
</li>
+ <li> Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with
config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+ (cf. <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+ As an alternative, and offline upgrade is also possible.
+ <ul>
+ <li> prepare your application instances for a rolling bounce and
make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code>
for new version 0.10.1.2 </li>
+ <li> bounce each instance of your application once </li>
+ <li> prepare your newly deployed 0.10.1.2 application instances
for a second round of rolling bounces; make sure to remove the value for config
<code>upgrade.mode</code> </li>
+ <li> bounce each instance of your application once more to
complete the upgrade </li>
+ </ul>
+ </li>
+ <li> Upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1 requires an offline
upgrade (rolling bounce upgrade is not supported)
+ <ul>
+ <li> stop all old (0.10.0.x) application instances </li>
+ <li> update your code and swap old code and jar file with new code
and new jar file </li>
+ <li> restart all new (0.10.1.0 or 0.10.1.1) application instances
</li>
+ </ul>
+ </li>
</ul>
<h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes
in 0.10.1.0</a></h5>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 25faa90..4084b12 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -31,6 +31,8 @@ versions += [
jackson: "2.8.5",
jetty: "9.2.22.v20170606",
jersey: "2.24",
+ kafka_0100: "0.10.0.1",
+ kafka_0101: "0.10.1.1",
log4j: "1.2.17",
jopt: "5.0.3",
junit: "4.12",
@@ -92,6 +94,8 @@ libs += [
junit: "junit:junit:$versions.junit",
log4j: "log4j:log4j:$versions.log4j",
joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
+ kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
+ kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
lz4: "net.jpountz.lz4:lz4:$versions.lz4",
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
powermock: "org.powermock:powermock-module-junit4:$versions.powermock",
diff --git a/settings.gradle b/settings.gradle
index 29d3895..576b40b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -13,5 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples',
'log4j-appender',
+include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples',
'streams:upgrade-system-tests-0100',
+ 'streams:upgrade-system-tests-0101', 'log4j-appender',
'connect:api', 'connect:transforms', 'connect:runtime',
'connect:json', 'connect:file'
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0724571..3baa078 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -95,6 +95,16 @@ public class StreamsConfig extends AbstractConfig {
*/
public static final String PRODUCER_PREFIX = "producer.";
+ /**
+ * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"}
for upgrading an application from version {@code 0.10.0.x}.
+ */
+ public static final String UPGRADE_FROM_0100 = "0.10.0";
+
+ /** {@code upgrade.from} */
+ public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
+ public static final String UPGRADE_FROM_DOC = "Allows upgrading from
version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
+ "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\"
(for upgrading from 0.10.0.x).";
+
/** {@code state.dir} */
public static final String STATE_DIR_CONFIG = "state.dir";
private static final String STATE_DIR_DOC = "Directory location for state
store.";
@@ -383,7 +393,13 @@ public class StreamsConfig extends AbstractConfig {
40 * 1000,
atLeast(0),
ConfigDef.Importance.MEDIUM,
- REQUEST_TIMEOUT_MS_DOC);
+ REQUEST_TIMEOUT_MS_DOC)
+ .define(UPGRADE_FROM_CONFIG,
+ ConfigDef.Type.STRING,
+ null,
+ in(null, UPGRADE_FROM_0100),
+ ConfigDef.Importance.LOW,
+ UPGRADE_FROM_DOC);
}
// this is the list of configs for underlying clients
@@ -501,6 +517,7 @@ public class StreamsConfig extends AbstractConfig {
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId +
"-consumer");
// add configs required for stream partition assignor
+ consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG));
consumerProps.put(InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
consumerProps.put(REPLICATION_FACTOR_CONFIG,
getInt(REPLICATION_FACTOR_CONFIG));
consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG,
getInt(NUM_STANDBY_REPLICAS_CONFIG));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index a50a819..889d2ff 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -155,6 +154,8 @@ public class StreamPartitionAssignor implements
PartitionAssignor, Configurable
private String userEndPoint;
private int numStandbyReplicas;
+ private int userMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
+
private Cluster metadataWithInternalTopics;
private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
@@ -182,6 +183,12 @@ public class StreamPartitionAssignor implements
PartitionAssignor, Configurable
public void configure(Map<String, ?> configs) {
numStandbyReplicas = (Integer)
configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+ final String upgradeMode = (String)
configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+ if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) {
+ log.info("Downgrading metadata version from 2 to 1 for upgrade
from 0.10.0.x.");
+ userMetadataVersion = 1;
+ }
+
Object o =
configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
if (o == null) {
KafkaException ex = new KafkaException("StreamThread is not
specified");
@@ -241,7 +248,7 @@ public class StreamPartitionAssignor implements
PartitionAssignor, Configurable
Set<TaskId> prevTasks = streamThread.prevTasks();
Set<TaskId> standbyTasks = streamThread.cachedTasks();
standbyTasks.removeAll(prevTasks);
- SubscriptionInfo data = new SubscriptionInfo(streamThread.processId,
prevTasks, standbyTasks, this.userEndPoint);
+ SubscriptionInfo data = new SubscriptionInfo(userMetadataVersion,
streamThread.processId, prevTasks, standbyTasks, this.userEndPoint);
if (streamThread.builder.sourceTopicPattern() != null) {
SubscriptionUpdates subscriptionUpdates = new
SubscriptionUpdates();
@@ -279,11 +286,16 @@ public class StreamPartitionAssignor implements
PartitionAssignor, Configurable
// construct the client metadata from the decoded subscription info
Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
+ int minUserMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
for (Map.Entry<String, Subscription> entry : subscriptions.entrySet())
{
String consumerId = entry.getKey();
Subscription subscription = entry.getValue();
SubscriptionInfo info =
SubscriptionInfo.decode(subscription.userData());
+ final int usedVersion = info.version;
+ if (usedVersion < minUserMetadataVersion) {
+ minUserMetadataVersion = usedVersion;
+ }
// create the new client metadata if necessary
ClientMetadata clientMetadata =
clientsMetadata.get(info.processId);
@@ -539,7 +551,7 @@ public class StreamPartitionAssignor implements
PartitionAssignor, Configurable
}
// finally, encode the assignment before sending back to
coordinator
- assignment.put(consumer, new Assignment(activePartitions, new
AssignmentInfo(active, standby, partitionsByHostState).encode()));
+ assignment.put(consumer, new Assignment(activePartitions, new
AssignmentInfo(minUserMetadataVersion, active, standby,
partitionsByHostState).encode()));
i++;
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index ddbd67d..7a6bf14 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.common.record.ByteBufferInputStream;
@@ -56,7 +55,7 @@ public class AssignmentInfo {
this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
}
- protected AssignmentInfo(int version, List<TaskId> activeTasks,
Map<TaskId, Set<TopicPartition>> standbyTasks,
+ public AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId,
Set<TopicPartition>> standbyTasks,
Map<HostInfo, Set<TopicPartition>> hostState) {
this.version = version;
this.activeTasks = activeTasks;
@@ -155,9 +154,7 @@ public class AssignmentInfo {
}
}
- return new AssignmentInfo(activeTasks, standbyTasks,
hostStateToTopicPartitions);
-
-
+ return new AssignmentInfo(version, activeTasks, standbyTasks,
hostStateToTopicPartitions);
} catch (IOException ex) {
throw new TaskAssignmentException("Failed to decode
AssignmentInfo", ex);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index c3481c0..92c50a2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.errors.TaskAssignmentException;
@@ -32,7 +31,7 @@ public class SubscriptionInfo {
private static final Logger log =
LoggerFactory.getLogger(SubscriptionInfo.class);
- private static final int CURRENT_VERSION = 2;
+ public static final int CURRENT_VERSION = 2;
public final int version;
public final UUID processId;
@@ -44,7 +43,7 @@ public class SubscriptionInfo {
this(CURRENT_VERSION, processId, prevTasks, standbyTasks,
userEndPoint);
}
- private SubscriptionInfo(int version, UUID processId, Set<TaskId>
prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
+ public SubscriptionInfo(int version, UUID processId, Set<TaskId>
prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
this.version = version;
this.processId = processId;
this.prevTasks = prevTasks;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 50ab117..832883a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -96,7 +96,7 @@ public class KafkaStreamsTest {
}
@Test
- public void testStateCloseAfterCreate() throws Exception {
+ public void testStateCloseAfterCreate() {
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -160,7 +160,7 @@ public class KafkaStreamsTest {
// make sure we have the global state thread running too
builder.globalTable("anyTopic", "anyStore");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
- final KafkaStreams streams = new KafkaStreams(builder, props);
+ new KafkaStreams(builder, props);
testStateThreadCloseHelper(numThreads);
}
@@ -200,9 +200,8 @@ public class KafkaStreamsTest {
}
-
@Test
- public void testInitializesAndDestroysMetricsReporters() throws Exception {
+ public void testInitializesAndDestroysMetricsReporters() {
final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -217,7 +216,7 @@ public class KafkaStreamsTest {
}
@Test
- public void testCloseIsIdempotent() throws Exception {
+ public void testCloseIsIdempotent() {
streams.close();
final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
@@ -227,7 +226,7 @@ public class KafkaStreamsTest {
}
@Test(expected = IllegalStateException.class)
- public void testCannotStartOnceClosed() throws Exception {
+ public void testCannotStartOnceClosed() {
streams.start();
streams.close();
try {
@@ -241,7 +240,7 @@ public class KafkaStreamsTest {
}
@Test(expected = IllegalStateException.class)
- public void testCannotStartTwice() throws Exception {
+ public void testCannotStartTwice() {
streams.start();
try {
@@ -267,10 +266,10 @@ public class KafkaStreamsTest {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG,
"illegalConfig");
final KStreamBuilder builder = new KStreamBuilder();
- final KafkaStreams streams = new KafkaStreams(builder, props);
-
+ new KafkaStreams(builder, props);
}
@Test
@@ -278,6 +277,7 @@ public class KafkaStreamsTest {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG,
Sensor.RecordingLevel.INFO.toString());
final KStreamBuilder builder1 = new KStreamBuilder();
final KafkaStreams streams1 = new KafkaStreams(builder1, props);
@@ -285,27 +285,26 @@ public class KafkaStreamsTest {
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG,
Sensor.RecordingLevel.DEBUG.toString());
final KStreamBuilder builder2 = new KStreamBuilder();
- final KafkaStreams streams2 = new KafkaStreams(builder2, props);
-
+ new KafkaStreams(builder2, props);
}
@Test(expected = IllegalStateException.class)
- public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
+ public void shouldNotGetAllTasksWhenNotRunning() {
streams.allMetadata();
}
@Test(expected = IllegalStateException.class)
- public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception
{
+ public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
streams.allMetadataForStore("store");
}
@Test(expected = IllegalStateException.class)
- public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws
Exception {
+ public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
streams.metadataForKey("store", "key", Serdes.String().serializer());
}
@Test(expected = IllegalStateException.class)
- public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws
Exception {
+ public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
streams.metadataForKey("store", "key", new StreamPartitioner<String,
Object>() {
@Override
public Integer partition(final String key, final Object value,
final int numPartitions) {
@@ -321,6 +320,7 @@ public class KafkaStreamsTest {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getAbsolutePath());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
final KStreamBuilder builder = new KStreamBuilder();
@@ -366,16 +366,18 @@ public class KafkaStreamsTest {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
final KStreamBuilder builder = new KStreamBuilder();
return new KafkaStreams(builder, props);
}
@Test
- public void testCleanup() throws Exception {
+ public void testCleanup() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
"testLocalCleanup");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -391,6 +393,7 @@ public class KafkaStreamsTest {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
"testCannotCleanupWhileRunning");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -448,6 +451,5 @@ public class KafkaStreamsTest {
streams.close();
}
}
-
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 15cc1af..ab8701f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -60,7 +60,7 @@ public class StreamsConfigTest {
}
@Test
- public void testGetProducerConfigs() throws Exception {
+ public void testGetProducerConfigs() {
Map<String, Object> returnedProps =
streamsConfig.getProducerConfigs("client");
assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG),
"client-producer");
assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG),
"100");
@@ -68,7 +68,7 @@ public class StreamsConfigTest {
}
@Test
- public void testGetConsumerConfigs() throws Exception {
+ public void testGetConsumerConfigs() {
Map<String, Object> returnedProps =
streamsConfig.getConsumerConfigs(null, "example-application", "client");
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG),
"client-consumer");
assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG),
"example-application");
@@ -77,7 +77,7 @@ public class StreamsConfigTest {
}
@Test
- public void testGetRestoreConsumerConfigs() throws Exception {
+ public void testGetRestoreConsumerConfigs() {
Map<String, Object> returnedProps =
streamsConfig.getRestoreConsumerConfigs("client");
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG),
"client-restore-consumer");
assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
@@ -86,7 +86,7 @@ public class StreamsConfigTest {
@Test
public void defaultSerdeShouldBeConfigured() {
- Map<String, Object> serializerConfigs = new HashMap<String, Object>();
+ Map<String, Object> serializerConfigs = new HashMap<>();
serializerConfigs.put("key.serializer.encoding", "UTF8");
serializerConfigs.put("value.serializer.encoding", "UTF-16");
Serializer<String> serializer = Serdes.String().serializer();
@@ -117,7 +117,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedConsumerConfigs() throws Exception {
+ public void shouldSupportPrefixedConsumerConfigs() {
props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"earliest");
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG),
1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -127,7 +127,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception
{
+ public void shouldSupportPrefixedRestoreConsumerConfigs() {
props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"earliest");
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG),
1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -137,7 +137,7 @@ public class StreamsConfigTest {
}
@Test
- public void
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() throws
Exception {
+ public void
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(consumerPrefix("interceptor.statsd.host"), "host");
final Map<String, Object> consumerConfigs =
streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
@@ -145,7 +145,7 @@ public class StreamsConfigTest {
}
@Test
- public void
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() throws
Exception {
+ public void
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(consumerPrefix("interceptor.statsd.host"), "host");
final Map<String, Object> consumerConfigs =
streamsConfig.getRestoreConsumerConfigs("clientId");
@@ -153,7 +153,7 @@ public class StreamsConfigTest {
}
@Test
- public void
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() throws
Exception {
+ public void
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(producerPrefix("interceptor.statsd.host"), "host");
final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs("clientId");
@@ -162,7 +162,7 @@ public class StreamsConfigTest {
@Test
- public void shouldSupportPrefixedProducerConfigs() throws Exception {
+ public void shouldSupportPrefixedProducerConfigs() {
props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG),
1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -172,7 +172,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception {
+ public void shouldBeSupportNonPrefixedConsumerConfigs() {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -182,7 +182,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws
Exception {
+ public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -192,7 +192,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportNonPrefixedProducerConfigs() throws Exception {
+ public void shouldSupportNonPrefixedProducerConfigs() {
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -201,24 +201,22 @@ public class StreamsConfigTest {
assertEquals(1,
configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
}
-
-
@Test(expected = StreamsException.class)
- public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() throws
Exception {
+ public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() {
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
MisconfiguredSerde.class);
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.keySerde();
}
@Test(expected = StreamsException.class)
- public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() throws
Exception {
+ public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() {
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
MisconfiguredSerde.class);
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.valueSerde();
}
@Test
- public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception
{
+ public void shouldOverrideStreamsDefaultConsumerConfigs() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"latest");
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
"10");
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -228,7 +226,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception
{
+ public void shouldOverrideStreamsDefaultProducerConfigs() {
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG),
"10000");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs("client");
@@ -236,7 +234,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer()
throws Exception {
+ public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer()
{
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"latest");
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
"10");
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -246,14 +244,14 @@ public class StreamsConfigTest {
}
@Test(expected = ConfigException.class)
- public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws
Exception {
+ public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
"true");
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.getConsumerConfigs(null, "a", "b");
}
@Test(expected = ConfigException.class)
- public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden()
throws Exception {
+ public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
"true");
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.getRestoreConsumerConfigs("client");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 1d2a3e2..1256824 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -32,6 +32,7 @@ import
org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.TestUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -112,6 +113,7 @@ public class FanoutIntegrationTest {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"fanout-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ streamsConfiguration.setProperty(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
cacheSizeBytes);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 6a8c7ff..3e0d80a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -10,6 +10,7 @@
*/
package org.apache.kafka.streams.integration;
+import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -44,8 +45,6 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
-import kafka.utils.MockTime;
-
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@@ -127,7 +126,7 @@ public class KStreamAggregationDedupIntegrationTest {
List<KeyValue<String, String>> results = receiveMessages(
new StringDeserializer(),
new StringDeserializer(),
- 5);
+ 5);
Collections.sort(results, new Comparator<KeyValue<String, String>>() {
@Override
@@ -177,7 +176,7 @@ public class KStreamAggregationDedupIntegrationTest {
List<KeyValue<String, String>> windowedOutput = receiveMessages(
new StringDeserializer(),
new StringDeserializer(),
- 10);
+ 10);
Comparator<KeyValue<String, String>>
comparator =
@@ -229,7 +228,7 @@ public class KStreamAggregationDedupIntegrationTest {
final List<KeyValue<String, Long>> results = receiveMessages(
new StringDeserializer(),
new LongDeserializer(),
- 5);
+ 5);
Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
@Override
public int compare(final KeyValue<String, Long> o1, final
KeyValue<String, Long> o2) {
@@ -303,6 +302,4 @@ public class KStreamAggregationDedupIntegrationTest {
}
-
-
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index bd5911d..13124f1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -155,7 +155,7 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValue<String, String>> results = receiveMessages(
new StringDeserializer(),
new StringDeserializer(),
- 10);
+ 10);
Collections.sort(results, new Comparator<KeyValue<String, String>>() {
@Override
@@ -209,7 +209,7 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValue<String, String>> windowedOutput = receiveMessages(
new StringDeserializer(),
new StringDeserializer(),
- 15);
+ 15);
final Comparator<KeyValue<String, String>>
comparator =
@@ -263,7 +263,7 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValue<String, Integer>> results = receiveMessages(
new StringDeserializer(),
new IntegerDeserializer(),
- 10);
+ 10);
Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
@Override
@@ -313,7 +313,7 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValue<String, Integer>> windowedMessages =
receiveMessages(
new StringDeserializer(),
new IntegerDeserializer(),
- 15);
+ 15);
final Comparator<KeyValue<String, Integer>>
comparator =
@@ -364,7 +364,7 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValue<String, Long>> results = receiveMessages(
new StringDeserializer(),
new LongDeserializer(),
- 10);
+ 10);
Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
@Override
public int compare(final KeyValue<String, Long> o1, final
KeyValue<String, Long> o2) {
@@ -406,7 +406,7 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValue<String, Long>> results = receiveMessages(
new StringDeserializer(),
new LongDeserializer(),
- 10);
+ 10);
Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
@Override
public int compare(final KeyValue<String, Long> o1, final
KeyValue<String, Long> o2) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 64e8459..d09d505 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -129,7 +129,7 @@ public class QueryableStateIntegrationTest {
}
@Before
- public void before() throws IOException, InterruptedException {
+ public void before() throws Exception {
testNo++;
createTopics();
streamsConfiguration = new Properties();
@@ -609,15 +609,13 @@ public class QueryableStateIntegrationTest {
* @param failIfKeyNotFound if true, tests fails if an expected key is
not found in store. If false,
* the method merely inserts the new found
key into the list of
* expected keys.
- * @throws InterruptedException
*/
private void verifyGreaterOrEqual(final String[] keys,
final Map<String, Long>
expectedWindowedCount,
final Map<String, Long> expectedCount,
final ReadOnlyWindowStore<String, Long>
windowStore,
final ReadOnlyKeyValueStore<String,
Long> keyValueStore,
- final boolean failIfKeyNotFound)
- throws InterruptedException {
+ final boolean failIfKeyNotFound) {
final Map<String, Long> windowState = new HashMap<>();
final Map<String, Long> countState = new HashMap<>();
@@ -744,5 +742,4 @@ public class QueryableStateIntegrationTest {
}
}
-
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 6503038..e06ed73 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -111,6 +112,7 @@ public class StreamPartitionAssignorTest {
{
setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
"stream-partition-assignor-test");
setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
userEndPoint);
+ setProperty(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
MockTimestampExtractor.class.getName());
}
@@ -119,7 +121,7 @@ public class StreamPartitionAssignorTest {
@SuppressWarnings("unchecked")
@Test
- public void testSubscription() throws Exception {
+ public void testSubscription() {
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(),
"source1", "source2");
@@ -159,7 +161,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignBasic() throws Exception {
+ public void testAssignBasic() {
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(),
"source1", "source2");
@@ -227,7 +229,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithPartialTopology() throws Exception {
+ public void testAssignWithPartialTopology() {
Properties props = configProps();
props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG,
SingleGroupPartitionGrouperStub.class);
StreamsConfig config = new StreamsConfig(props);
@@ -267,7 +269,7 @@ public class StreamPartitionAssignorTest {
@Test
- public void testAssignEmptyMetadata() throws Exception {
+ public void testAssignEmptyMetadata() {
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(),
"source1", "source2");
@@ -324,7 +326,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithNewTasks() throws Exception {
+ public void testAssignWithNewTasks() {
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addSource("source3", "topic3");
@@ -381,7 +383,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithStates() throws Exception {
+ public void testAssignWithStates() {
String applicationId = "test";
builder.setApplicationId(applicationId);
builder.addSource("source1", "topic1");
@@ -470,7 +472,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithStandbyReplicas() throws Exception {
+ public void testAssignWithStandbyReplicas() {
Properties props = configProps();
props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
StreamsConfig config = new StreamsConfig(props);
@@ -543,7 +545,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testOnAssignment() throws Exception {
+ public void testOnAssignment() {
TopicPartition t2p3 = new TopicPartition("topic2", 3);
TopologyBuilder builder = new TopologyBuilder();
@@ -576,7 +578,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithInternalTopics() throws Exception {
+ public void testAssignWithInternalTopics() {
String applicationId = "test";
builder.setApplicationId(applicationId);
builder.addInternalTopic("topicX");
@@ -612,7 +614,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic()
throws Exception {
+ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic()
{
String applicationId = "test";
builder.setApplicationId(applicationId);
builder.addInternalTopic("topicX");
@@ -650,7 +652,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
+ public void shouldAddUserDefinedEndPointToSubscription() {
final Properties properties = configProps();
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
"localhost:8080");
final StreamsConfig config = new StreamsConfig(properties);
@@ -663,8 +665,8 @@ public class StreamPartitionAssignorTest {
final UUID uuid1 = UUID.randomUUID();
final String client1 = "client1";
- final StreamThread streamThread = new StreamThread(builder, config,
mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM,
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
- 0);
+ final StreamThread streamThread = new StreamThread(builder, config,
mockClientSupplier, applicationId, client1,
+ uuid1, new Metrics(), Time.SYSTEM, new
StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
partitionAssignor.configure(config.getConsumerConfigs(streamThread,
applicationId, client1));
final PartitionAssignor.Subscription subscription =
partitionAssignor.subscription(Utils.mkSet("input"));
@@ -673,7 +675,80 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldMapUserEndPointToTopicPartitions() throws Exception {
+ public void
shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() {
+ final Map<String, PartitionAssignor.Subscription> subscriptions = new
HashMap<>();
+ final Set<TaskId> emptyTasks = Collections.emptySet();
+ subscriptions.put(
+ "consumer1",
+ new PartitionAssignor.Subscription(
+ Collections.singletonList("topic1"),
+ new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks,
emptyTasks, null).encode()
+ )
+ );
+ subscriptions.put(
+ "consumer2",
+ new PartitionAssignor.Subscription(
+ Collections.singletonList("topic1"),
+ new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks,
emptyTasks, null).encode()
+ )
+ );
+
+ final StreamPartitionAssignor partitionAssignor = new
StreamPartitionAssignor();
+ StreamsConfig config = new StreamsConfig(configProps());
+
+ final TopologyBuilder builder = new TopologyBuilder();
+ final StreamThread streamThread = new StreamThread(
+ builder,
+ config,
+ mockClientSupplier,
+ "appId",
+ "clientId",
+ UUID.randomUUID(),
+ new Metrics(),
+ Time.SYSTEM,
+ new StreamsMetadataState(builder,
StreamsMetadataState.UNKNOWN_HOST),
+ 0);
+
+ partitionAssignor.configure(config.getConsumerConfigs(streamThread,
"test", "clientId"));
+ final Map<String, PartitionAssignor.Assignment> assignment =
partitionAssignor.assign(metadata, subscriptions);
+
+ assertEquals(2, assignment.size());
+ assertEquals(1,
AssignmentInfo.decode(assignment.get("consumer1").userData()).version);
+ assertEquals(1,
AssignmentInfo.decode(assignment.get("consumer2").userData()).version);
+ }
+
+ @Test
+ public void shouldDownGradeSubscription() {
+ final Properties properties = configProps();
+ properties.put(StreamsConfig.UPGRADE_FROM_CONFIG,
StreamsConfig.UPGRADE_FROM_0100);
+ StreamsConfig config = new StreamsConfig(properties);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source1", "topic1");
+
+ String clientId = "client-id";
+ final StreamThread streamThread = new StreamThread(
+ builder,
+ config,
+ mockClientSupplier,
+ "appId",
+ "clientId",
+ UUID.randomUUID(),
+ new Metrics(),
+ Time.SYSTEM,
+ new StreamsMetadataState(builder,
StreamsMetadataState.UNKNOWN_HOST),
+ 0);
+
+ StreamPartitionAssignor partitionAssignor = new
StreamPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(streamThread,
"test", clientId));
+
+ PartitionAssignor.Subscription subscription =
partitionAssignor.subscription(Utils.mkSet("topic1"));
+
+ assertEquals(1,
SubscriptionInfo.decode(subscription.userData()).version);
+ }
+
+ @Test
+ public void shouldMapUserEndPointToTopicPartitions() {
final Properties properties = configProps();
final String myEndPoint = "localhost:8080";
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -711,7 +786,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void
shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws
Exception {
+ public void
shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
final Properties properties = configProps();
final String myEndPoint = "localhost";
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -736,7 +811,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void
shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws
Exception {
+ public void
shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
final Properties properties = configProps();
final String myEndPoint = "localhost:j87yhk";
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -760,7 +835,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws
Exception {
+ public void shouldExposeHostStateToTopicPartitionsOnAssignment() {
List<TopicPartition> topic = Collections.singletonList(new
TopicPartition("topic", 0));
final Map<HostInfo, Set<TopicPartition>> hostState =
Collections.singletonMap(new HostInfo("localhost", 80),
@@ -773,7 +848,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldSetClusterMetadataOnAssignment() throws Exception {
+ public void shouldSetClusterMetadataOnAssignment() {
final List<TopicPartition> topic = Collections.singletonList(new
TopicPartition("topic", 0));
final Map<HostInfo, Set<TopicPartition>> hostState =
Collections.singletonMap(new HostInfo("localhost", 80),
@@ -793,7 +868,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws
Exception {
+ public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() {
final Cluster cluster = partitionAssignor.clusterMetadata();
assertNotNull(cluster);
}
@@ -891,11 +966,11 @@ public class StreamPartitionAssignorTest {
new TopicPartition(applicationId + "-count-repartition", 1),
new TopicPartition(applicationId + "-count-repartition", 2)
);
- assertThat(new HashSet(assignment.get(client).partitions()),
equalTo(new HashSet(expectedAssignment)));
+ assertThat(new HashSet<>(assignment.get(client).partitions()),
equalTo(new HashSet<>(expectedAssignment)));
}
@Test
- public void shouldUpdatePartitionHostInfoMapOnAssignment() throws
Exception {
+ public void shouldUpdatePartitionHostInfoMapOnAssignment() {
final TopicPartition partitionOne = new TopicPartition("topic", 1);
final TopicPartition partitionTwo = new TopicPartition("topic", 2);
final Map<HostInfo, Set<TopicPartition>> firstHostState =
Collections.singletonMap(
@@ -912,7 +987,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldUpdateClusterMetadataOnAssignment() throws Exception {
+ public void shouldUpdateClusterMetadataOnAssignment() {
final TopicPartition topicOne = new TopicPartition("topic", 1);
final TopicPartition topicTwo = new TopicPartition("topic2", 2);
final Map<HostInfo, Set<TopicPartition>> firstHostState =
Collections.singletonMap(
@@ -928,7 +1003,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws
Exception {
+ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
final Properties props = configProps();
props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
final StreamsConfig config = new StreamsConfig(props);
@@ -976,12 +1051,12 @@ public class StreamPartitionAssignorTest {
}
@Test(expected = KafkaException.class)
- public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() throws
Exception {
+ public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() {
partitionAssignor.configure(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
1));
}
@Test(expected = KafkaException.class)
- public void
shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() throws
Exception {
+ public void
shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() {
final Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
config.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, "i am
not a stream thread");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index cfa0e61..52c753d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -65,10 +65,9 @@ public class AssignmentInfoTest {
assertEquals(oldVersion.activeTasks, decoded.activeTasks);
assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
assertEquals(0, decoded.partitionsByHost.size()); // should be empty
as wasn't in V1
- assertEquals(2, decoded.version); // automatically upgraded to v2 on
decode;
+ assertEquals(1, decoded.version);
}
-
/**
* This is a clone of what the V1 encoding did. The encode method has
changed for V2
* so it is impossible to test compatibility without having this
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index d192126..9f59b11 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -115,7 +115,7 @@ public class SmokeTestClient extends SmokeTestUtil {
}
});
- data.process(SmokeTestUtil.printProcessorSupplier("data"));
+ data.process(SmokeTestUtil.<String,
Integer>printProcessorSupplier("data"));
// min
KGroupedStream<String, Integer>
@@ -141,7 +141,7 @@ public class SmokeTestClient extends SmokeTestUtil {
).to(stringSerde, intSerde, "min");
KTable<String, Integer> minTable = builder.table(stringSerde,
intSerde, "min", "minStoreName");
-
minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min"));
+ minTable.toStream().process(SmokeTestUtil.<String,
Integer>printProcessorSupplier("min"));
// max
groupedData.aggregate(
@@ -163,7 +163,7 @@ public class SmokeTestClient extends SmokeTestUtil {
).to(stringSerde, intSerde, "max");
KTable<String, Integer> maxTable = builder.table(stringSerde,
intSerde, "max", "maxStoreName");
-
maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max"));
+ maxTable.toStream().process(SmokeTestUtil.<String,
Integer>printProcessorSupplier("max"));
// sum
groupedData.aggregate(
@@ -186,7 +186,7 @@ public class SmokeTestClient extends SmokeTestUtil {
KTable<String, Long> sumTable = builder.table(stringSerde, longSerde,
"sum", "sumStoreName");
-
sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"));
+ sumTable.toStream().process(SmokeTestUtil.<String,
Long>printProcessorSupplier("sum"));
// cnt
groupedData.count(TimeWindows.of(TimeUnit.DAYS.toMillis(2)),
"uwin-cnt")
@@ -195,7 +195,7 @@ public class SmokeTestClient extends SmokeTestUtil {
).to(stringSerde, longSerde, "cnt");
KTable<String, Long> cntTable = builder.table(stringSerde, longSerde,
"cnt", "cntStoreName");
-
cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"));
+ cntTable.toStream().process(SmokeTestUtil.<String,
Long>printProcessorSupplier("cnt"));
// dif
maxTable.join(minTable,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index c2cfd84..a0c2933 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -77,7 +77,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
// This main() is not used by the system test. It is intended to be used
for local debugging.
public static void main(String[] args) throws Exception {
final String kafka = "localhost:9092";
- final String zookeeper = "localhost:2181";
final File stateDir = TestUtils.tempDirectory();
final int numKeys = 20;
@@ -131,42 +130,50 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
public static Map<String, Set<Integer>> generate(String kafka, final int
numKeys, final int maxRecordsPerKey) throws Exception {
+ return generate(kafka, numKeys, maxRecordsPerKey, true);
+ }
+ public static Map<String, Set<Integer>> generate(final String kafka,
+ final int numKeys,
+ final int
maxRecordsPerKey,
+ final boolean
autoTerminate) throws Exception {
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
- // the next 4 config values make sure that all records are produced
with no loss and
- // no duplicates
+ // the next 2 config values make sure that all records are produced
with no loss and no duplicates
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
- KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps);
+ final KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps);
int numRecordsProduced = 0;
- Map<String, Set<Integer>> allData = new HashMap<>();
- ValueList[] data = new ValueList[numKeys];
+ final Map<String, Set<Integer>> allData = new HashMap<>();
+ final ValueList[] data = new ValueList[numKeys];
for (int i = 0; i < numKeys; i++) {
data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
allData.put(data[i].key, new HashSet<Integer>());
}
- Random rand = new Random();
+ final Random rand = new Random();
- int remaining = data.length;
+ int remaining = 1; // dummy value must be positive if <autoTerminate>
is false
+ if (autoTerminate) {
+ remaining = data.length;
+ }
while (remaining > 0) {
- int index = rand.nextInt(remaining);
- String key = data[index].key;
+ final int index = autoTerminate ? rand.nextInt(remaining) :
rand.nextInt(numKeys);
+ final String key = data[index].key;
int value = data[index].next();
- if (value < 0) {
+ if (autoTerminate && value < 0) {
remaining--;
data[index] = data[remaining];
} else {
- ProducerRecord<byte[], byte[]> record =
- new ProducerRecord<>("data",
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value));
+ final ProducerRecord<byte[], byte[]> record =
+ new ProducerRecord<>("data",
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value));
producer.send(record, new Callback() {
@Override
@@ -178,11 +185,12 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
});
-
numRecordsProduced++;
allData.get(key).add(value);
- if (numRecordsProduced % 100 == 0)
+
+ if (numRecordsProduced % 100 == 0) {
System.out.println(numRecordsProduced + " records
produced");
+ }
Utils.sleep(2);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 73fe27c..87ab60c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -33,8 +33,6 @@ import java.io.File;
public class SmokeTestUtil {
- public final static int WINDOW_SIZE = 100;
- public final static long START_TIME = 60000L * 60 * 24 * 365 * 30;
public final static int END = Integer.MAX_VALUE;
public static ProcessorSupplier<Object, Object>
printProcessorSupplier(final String topic) {
@@ -46,18 +44,15 @@ public class SmokeTestUtil {
public Processor<Object, Object> get() {
return new AbstractProcessor<Object, Object>() {
private int numRecordsProcessed = 0;
- private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
System.out.println("initializing processor: topic=" +
topic + " taskId=" + context.taskId());
numRecordsProcessed = 0;
- this.context = context;
}
@Override
public void process(Object key, Object value) {
- if (printOffset) System.out.println(">>> " +
context.offset());
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " +
numRecordsProcessed + " records from topic=" + topic);
@@ -65,12 +60,10 @@ public class SmokeTestUtil {
}
@Override
- public void punctuate(long timestamp) {
- }
+ public void punctuate(long timestamp) {}
@Override
- public void close() {
- }
+ public void close() {}
};
}
};
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index 304cae7..aa1def1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -24,7 +24,7 @@ import java.util.Set;
public class StreamsSmokeTest {
/**
- * args ::= command kafka zookeeper stateDir
+ * args ::= command kafka zookeeper stateDir disableAutoTerminate
* command := "run" | "process"
*
* @param args
@@ -33,11 +33,13 @@ public class StreamsSmokeTest {
String kafka = args[0];
String stateDir = args.length > 1 ? args[1] : null;
String command = args.length > 2 ? args[2] : null;
+ boolean disableAutoTerminate = args.length > 3;
- System.out.println("StreamsTest instance started");
+ System.out.println("StreamsTest instance started (StreamsSmokeTest)");
System.out.println("command=" + command);
System.out.println("kafka=" + kafka);
System.out.println("stateDir=" + stateDir);
+ System.out.println("disableAutoTerminate=" + disableAutoTerminate);
switch (command) {
case "standalone":
@@ -47,8 +49,12 @@ public class StreamsSmokeTest {
// this starts the driver (data generation and result
verification)
final int numKeys = 10;
final int maxRecordsPerKey = 500;
- Map<String, Set<Integer>> allData =
SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
- SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+ if (disableAutoTerminate) {
+ SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey,
false);
+ } else {
+ Map<String, Set<Integer>> allData =
SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
+ SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+ }
break;
case "process":
// this starts a KafkaStreams client
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..17ff97e
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+ @SuppressWarnings("unchecked")
+ public static void main(final String[] args) {
+ if (args.length < 2) {
+ System.err.println("StreamsUpgradeTest requires two argument
(kafka-url, state-dir, [upgradeFrom: optional]) but only " + args.length + "
provided: "
+ + (args.length > 0 ? args[0] : ""));
+ }
+ final String kafka = args[0];
+ final String stateDir = args[1];
+ final String upgradeFrom = args.length > 2 ? args[2] : null;
+
+ System.out.println("StreamsTest instance started (StreamsUpgradeTest
trunk)");
+ System.out.println("kafka=" + kafka);
+ System.out.println("stateDir=" + stateDir);
+ System.out.println("upgradeFrom=" + upgradeFrom);
+
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ final KStream dataStream = builder.stream("data");
+ dataStream.process(SmokeTestUtil.printProcessorSupplier("data"));
+ dataStream.to("echo");
+
+ final Properties config = new Properties();
+ config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest");
+ config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+ config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+ if (upgradeFrom != null) {
+ config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom);
+ }
+
+
+ final KafkaStreams streams = new KafkaStreams(builder, config);
+ streams.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("closing Kafka Streams instance");
+ System.out.flush();
+ streams.close();
+ System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+ System.out.flush();
+ }
+ });
+ }
+}
diff --git
a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..7d3ed43
--- /dev/null
+++
b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+ @SuppressWarnings("unchecked")
+ public static void main(final String[] args) {
+ if (args.length < 3) {
+ System.err.println("StreamsUpgradeTest requires three argument
(kafka-url, zookeeper-url, state-dir) but only " + args.length + " provided: "
+ + (args.length > 0 ? args[0] + " " : "")
+ + (args.length > 1 ? args[1] : ""));
+ }
+ final String kafka = args[0];
+ final String zookeeper = args[1];
+ final String stateDir = args[2];
+
+ System.out.println("StreamsTest instance started (StreamsUpgradeTest
v0.10.0)");
+ System.out.println("kafka=" + kafka);
+ System.out.println("zookeeper=" + zookeeper);
+ System.out.println("stateDir=" + stateDir);
+
+ final KStreamBuilder builder = new KStreamBuilder();
+ final KStream dataStream = builder.stream("data");
+ dataStream.process(printProcessorSupplier());
+ dataStream.to("echo");
+
+ final Properties config = new Properties();
+ config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest");
+ config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+ config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+ config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+
+ final KafkaStreams streams = new KafkaStreams(builder, config);
+ streams.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("closing Kafka Streams instance");
+ System.out.flush();
+ streams.close();
+ System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+ System.out.flush();
+ }
+ });
+ }
+
+ private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+ return new ProcessorSupplier<K, V>() {
+ public Processor<K, V> get() {
+ return new AbstractProcessor<K, V>() {
+ private int numRecordsProcessed = 0;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ System.out.println("initializing processor: topic=data
taskId=" + context.taskId());
+ numRecordsProcessed = 0;
+ }
+
+ @Override
+ public void process(final K key, final V value) {
+ numRecordsProcessed++;
+ if (numRecordsProcessed % 100 == 0) {
+ System.out.println("processed " +
numRecordsProcessed + " records from topic=data");
+ }
+ }
+
+ @Override
+ public void punctuate(final long timestamp) {}
+
+ @Override
+ public void close() {}
+ };
+ }
+ };
+ }
+}
diff --git
a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..604fbe7
--- /dev/null
+++
b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+ /**
+ * This test cannot be run executed, as long as Kafka 0.10.1.2 is not
released
+ */
+ @SuppressWarnings("unchecked")
+ public static void main(final String[] args) {
+ if (args.length < 3) {
+ System.err.println("StreamsUpgradeTest requires three argument
(kafka-url, zookeeper-url, state-dir, [upgradeFrom: optional]) but only " +
args.length + " provided: "
+ + (args.length > 0 ? args[0] + " " : "")
+ + (args.length > 1 ? args[1] : ""));
+ }
+ String kafka = args[0];
+ String zookeeper = args[1];
+ String stateDir = args[2];
+ String upgradeFrom = args.length > 3 ? args[3] : null;
+
+ System.out.println("StreamsTest instance started (StreamsUpgradeTest
v0.10.1)");
+ System.out.println("kafka=" + kafka);
+ System.out.println("zookeeper=" + zookeeper);
+ System.out.println("stateDir=" + stateDir);
+ System.out.println("upgradeFrom=" + upgradeFrom);
+
+ final KStreamBuilder builder = new KStreamBuilder();
+ KStream dataStream = builder.stream("data");
+ dataStream.process(printProcessorSupplier());
+ dataStream.to("echo");
+
+ final Properties config = new Properties();
+ config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest");
+ config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+ config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+ config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+ if (upgradeFrom != null) {
+ // TODO: because Kafka 0.10.1.2 is not released yet, thus
`UPGRADE_FROM_CONFIG` is not available yet
+ //config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG,
upgradeFrom);
+ config.setProperty("upgrade.from", upgradeFrom);
+ }
+
+ final KafkaStreams streams = new KafkaStreams(builder, config);
+ streams.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("closing Kafka Streams instance");
+ System.out.flush();
+ streams.close();
+ System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+ System.out.flush();
+ }
+ });
+ }
+
+ private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+ return new ProcessorSupplier<K, V>() {
+ public Processor<K, V> get() {
+ return new AbstractProcessor<K, V>() {
+ private int numRecordsProcessed = 0;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ System.out.println("initializing processor: topic=data
taskId=" + context.taskId());
+ numRecordsProcessed = 0;
+ }
+
+ @Override
+ public void process(final K key, final V value) {
+ numRecordsProcessed++;
+ if (numRecordsProcessed % 100 == 0) {
+ System.out.println("processed " +
numRecordsProcessed + " records from topic=data");
+ }
+ }
+
+ @Override
+ public void punctuate(final long timestamp) {}
+
+ @Override
+ public void close() {}
+ };
+ }
+ };
+ }
+}
diff --git a/tests/kafkatest/services/streams.py
b/tests/kafkatest/services/streams.py
index e7be947..b7de568 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -20,6 +20,7 @@ from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1
class StreamsTestBaseService(KafkaPathResolverMixin, Service):
@@ -33,6 +34,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
+ CLEAN_NODE_ENABLED = True
+
logs = {
"streams_log": {
"path": LOG_FILE,
@@ -43,6 +46,114 @@ class StreamsTestBaseService(KafkaPathResolverMixin,
Service):
"streams_stderr": {
"path": STDERR_FILE,
"collect_default": True},
+ "streams_log.0-1": {
+ "path": LOG_FILE + ".0-1",
+ "collect_default": True},
+ "streams_stdout.0-1": {
+ "path": STDOUT_FILE + ".0-1",
+ "collect_default": True},
+ "streams_stderr.0-1": {
+ "path": STDERR_FILE + ".0-1",
+ "collect_default": True},
+ "streams_log.0-2": {
+ "path": LOG_FILE + ".0-2",
+ "collect_default": True},
+ "streams_stdout.0-2": {
+ "path": STDOUT_FILE + ".0-2",
+ "collect_default": True},
+ "streams_stderr.0-2": {
+ "path": STDERR_FILE + ".0-2",
+ "collect_default": True},
+ "streams_log.0-3": {
+ "path": LOG_FILE + ".0-3",
+ "collect_default": True},
+ "streams_stdout.0-3": {
+ "path": STDOUT_FILE + ".0-3",
+ "collect_default": True},
+ "streams_stderr.0-3": {
+ "path": STDERR_FILE + ".0-3",
+ "collect_default": True},
+ "streams_log.0-4": {
+ "path": LOG_FILE + ".0-4",
+ "collect_default": True},
+ "streams_stdout.0-4": {
+ "path": STDOUT_FILE + ".0-4",
+ "collect_default": True},
+ "streams_stderr.0-4": {
+ "path": STDERR_FILE + ".0-4",
+ "collect_default": True},
+ "streams_log.0-5": {
+ "path": LOG_FILE + ".0-5",
+ "collect_default": True},
+ "streams_stdout.0-5": {
+ "path": STDOUT_FILE + ".0-5",
+ "collect_default": True},
+ "streams_stderr.0-5": {
+ "path": STDERR_FILE + ".0-5",
+ "collect_default": True},
+ "streams_log.0-6": {
+ "path": LOG_FILE + ".0-6",
+ "collect_default": True},
+ "streams_stdout.0-6": {
+ "path": STDOUT_FILE + ".0-6",
+ "collect_default": True},
+ "streams_stderr.0-6": {
+ "path": STDERR_FILE + ".0-6",
+ "collect_default": True},
+ "streams_log.1-1": {
+ "path": LOG_FILE + ".1-1",
+ "collect_default": True},
+ "streams_stdout.1-1": {
+ "path": STDOUT_FILE + ".1-1",
+ "collect_default": True},
+ "streams_stderr.1-1": {
+ "path": STDERR_FILE + ".1-1",
+ "collect_default": True},
+ "streams_log.1-2": {
+ "path": LOG_FILE + ".1-2",
+ "collect_default": True},
+ "streams_stdout.1-2": {
+ "path": STDOUT_FILE + ".1-2",
+ "collect_default": True},
+ "streams_stderr.1-2": {
+ "path": STDERR_FILE + ".1-2",
+ "collect_default": True},
+ "streams_log.1-3": {
+ "path": LOG_FILE + ".1-3",
+ "collect_default": True},
+ "streams_stdout.1-3": {
+ "path": STDOUT_FILE + ".1-3",
+ "collect_default": True},
+ "streams_stderr.1-3": {
+ "path": STDERR_FILE + ".1-3",
+ "collect_default": True},
+ "streams_log.1-4": {
+ "path": LOG_FILE + ".1-4",
+ "collect_default": True},
+ "streams_stdout.1-4": {
+ "path": STDOUT_FILE + ".1-4",
+ "collect_default": True},
+ "streams_stderr.1-4": {
+ "path": STDERR_FILE + ".1-4",
+ "collect_default": True},
+ "streams_log.1-5": {
+ "path": LOG_FILE + ".1-5",
+ "collect_default": True},
+ "streams_stdout.1-5": {
+ "path": STDOUT_FILE + ".1-5",
+ "collect_default": True},
+ "streams_stderr.1-5": {
+ "path": STDERR_FILE + ".1-5",
+ "collect_default": True},
+ "streams_log.1-6": {
+ "path": LOG_FILE + ".1-6",
+ "collect_default": True},
+ "streams_stdout.1-6": {
+ "path": STDOUT_FILE + ".1-6",
+ "collect_default": True},
+ "streams_stderr.1-6": {
+ "path": STDERR_FILE + ".1-6",
+ "collect_default": True},
}
def __init__(self, test_context, kafka, streams_class_name,
user_test_args, user_test_args1=None, user_test_args2=None):
@@ -107,7 +218,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin,
Service):
def clean_node(self, node):
node.account.kill_process("streams", clean_shutdown=False,
allow_fail=True)
- node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+ if self.CLEAN_NODE_ENABLED:
+ node.account.ssh("rm -rf " + self.PERSISTENT_ROOT,
allow_fail=False)
def start_cmd(self, node):
args = self.args.copy()
@@ -153,7 +265,28 @@ class StreamsSmokeTestBaseService(StreamsTestBaseService):
class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsSmokeTestDriverService, self).__init__(test_context,
kafka, "run")
+ self.DISABLE_AUTO_TERMINATE = ""
+
+ def disable_auto_terminate(self):
+ self.DISABLE_AUTO_TERMINATE = "disableAutoTerminate"
+
+ def start_cmd(self, node):
+ args = self.args.copy()
+ args['kafka'] = self.kafka.bootstrap_servers()
+ args['state_dir'] = self.PERSISTENT_ROOT
+ args['stdout'] = self.STDOUT_FILE
+ args['stderr'] = self.STDERR_FILE
+ args['pidfile'] = self.PID_FILE
+ args['log4j'] = self.LOG4J_CONFIG_FILE
+ args['disable_auto_terminate'] = self.DISABLE_AUTO_TERMINATE
+ args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+ cmd = "( export
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+ "INCLUDE_TEST_JARS=true %(kafka_run_class)s
%(streams_class_name)s " \
+ " %(kafka)s %(state_dir)s %(user_test_args)s
%(disable_auto_terminate)s" \
+ " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s"
% args
+
+ return cmd
class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
@@ -171,3 +304,41 @@ class
StreamsBrokerCompatibilityService(StreamsTestBaseService):
kafka,
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
"dummy")
+
+class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
+ def __init__(self, test_context, kafka):
+ super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context,
+ kafka,
+
"org.apache.kafka.streams.tests.StreamsUpgradeTest",
+ "")
+ self.UPGRADE_FROM = ""
+
+ def set_version(self, kafka_streams_version):
+ self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+ def set_upgrade_from(self, upgrade_from):
+ self.UPGRADE_FROM = upgrade_from
+
+ def start_cmd(self, node):
+ args = self.args.copy()
+ args['kafka'] = self.kafka.bootstrap_servers()
+ if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or
self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
+ args['zk'] = self.kafka.zk.connect_setting()
+ else:
+ args['zk'] = ""
+ args['state_dir'] = self.PERSISTENT_ROOT
+ args['stdout'] = self.STDOUT_FILE
+ args['stderr'] = self.STDERR_FILE
+ args['pidfile'] = self.PID_FILE
+ args['log4j'] = self.LOG4J_CONFIG_FILE
+ args['version'] = self.KAFKA_STREAMS_VERSION
+ args['upgrade_from'] = self.UPGRADE_FROM
+ args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+ cmd = "( export
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+ "INCLUDE_TEST_JARS=true
UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
+ " %(kafka_run_class)s %(streams_class_name)s " \
+ " %(kafka)s %(zk)s %(state_dir)s %(user_test_args)s
%(upgrade_from)s" \
+ " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s"
% args
+
+ return cmd
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
new file mode 100644
index 0000000..294e354
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -0,0 +1,242 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsSmokeTestDriverService,
StreamsUpgradeTestJobRunnerService
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, DEV_VERSION
+import random
+
+class StreamsUpgradeTest(KafkaTest):
+ """
+ Test upgrading Kafka Streams (all version combination)
+ If metadata was changes, upgrade is more difficult
+ Metadata version was bumped in 0.10.1.0
+ """
+
+ def __init__(self, test_context):
+ super(StreamsUpgradeTest, self).__init__(test_context, num_zk=1,
num_brokers=1, topics={
+ 'echo' : { 'partitions': 5 },
+ 'data' : { 'partitions': 5 }
+ })
+
+ self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
+ self.driver.disable_auto_terminate()
+ self.processor1 = StreamsUpgradeTestJobRunnerService(test_context,
self.kafka)
+ self.processor2 = StreamsUpgradeTestJobRunnerService(test_context,
self.kafka)
+ self.processor3 = StreamsUpgradeTestJobRunnerService(test_context,
self.kafka)
+
+ def test_simple_upgrade(self):
+ """
+ Starts 3 KafkaStreams instances with version 0.10.1, and upgrades
one-by-one to 0.10.2
+ """
+
+ self.driver.start()
+ self.start_all_nodes_with(str(LATEST_0_10_1))
+
+ self.processors = [self.processor1, self.processor2, self.processor3]
+
+ counter = 1
+ random.seed()
+
+ random.shuffle(self.processors)
+ for p in self.processors:
+ p.CLEAN_NODE_ENABLED = False
+ self.do_rolling_bounce(p, "", str(DEV_VERSION), counter)
+ counter = counter + 1
+
+ # shutdown
+ self.driver.stop()
+ self.driver.wait()
+
+ random.shuffle(self.processors)
+ for p in self.processors:
+ node = p.node
+ with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+ p.stop()
+ monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
+ timeout_sec=60,
+ err_msg="Never saw output
'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+
+ self.driver.stop()
+
+ #@parametrize(new_version=str(LATEST_0_10_1)) we cannot run this test
until Kafka 0.10.1.2 is released
+ @parametrize(new_version=str(DEV_VERSION))
+ def test_metadata_upgrade(self, new_version):
+ """
+ Starts 3 KafkaStreams instances with version 0.10.0, and upgrades
one-by-one to <new_version>
+ """
+
+ self.driver.start()
+ self.start_all_nodes_with(str(LATEST_0_10_0))
+
+ self.processors = [self.processor1, self.processor2, self.processor3]
+
+ counter = 1
+ random.seed()
+
+ # first rolling bounce
+ random.shuffle(self.processors)
+ for p in self.processors:
+ p.CLEAN_NODE_ENABLED = False
+ self.do_rolling_bounce(p, "0.10.0", new_version, counter)
+ counter = counter + 1
+
+ # second rolling bounce
+ random.shuffle(self.processors)
+ for p in self.processors:
+ self.do_rolling_bounce(p, "", new_version, counter)
+ counter = counter + 1
+
+ # shutdown
+ self.driver.stop()
+ self.driver.wait()
+
+ random.shuffle(self.processors)
+ for p in self.processors:
+ node = p.node
+ with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+ p.stop()
+ monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
+ timeout_sec=60,
+ err_msg="Never saw output
'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+
+ self.driver.stop()
+
+ def start_all_nodes_with(self, version):
+ # start first with <version>
+ self.prepare_for(self.processor1, version)
+ node1 = self.processor1.node
+ with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
+ with node1.account.monitor_log(self.processor1.LOG_FILE) as
log_monitor:
+ self.processor1.start()
+ log_monitor.wait_until("Kafka version : " + version,
+ timeout_sec=60,
+ err_msg="Could not detect Kafka Streams
version " + version + " " + str(node1.account))
+ monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100
records from topic' on" + str(node1.account))
+
+ # start second with <version>
+ self.prepare_for(self.processor2, version)
+ node2 = self.processor2.node
+ with node1.account.monitor_log(self.processor1.STDOUT_FILE) as
first_monitor:
+ with node2.account.monitor_log(self.processor2.STDOUT_FILE) as
second_monitor:
+ with node2.account.monitor_log(self.processor2.LOG_FILE) as
log_monitor:
+ self.processor2.start()
+ log_monitor.wait_until("Kafka version : " + version,
+ timeout_sec=60,
+ err_msg="Could not detect Kafka
Streams version " + version + " " + str(node2.account))
+ first_monitor.wait_until("processed 100 records from
topic",
+ timeout_sec=60,
+ err_msg="Never saw output
'processed 100 records from topic' on" + str(node1.account))
+ second_monitor.wait_until("processed 100 records from
topic",
+ timeout_sec=60,
+ err_msg="Never saw output
'processed 100 records from topic' on" + str(node2.account))
+
+ # start third with <version>
+ self.prepare_for(self.processor3, version)
+ node3 = self.processor3.node
+ with node1.account.monitor_log(self.processor1.STDOUT_FILE) as
first_monitor:
+ with node2.account.monitor_log(self.processor2.STDOUT_FILE) as
second_monitor:
+ with node3.account.monitor_log(self.processor3.STDOUT_FILE) as
third_monitor:
+ with node3.account.monitor_log(self.processor3.LOG_FILE)
as log_monitor:
+ self.processor3.start()
+ log_monitor.wait_until("Kafka version : " + version,
+ timeout_sec=60,
+ err_msg="Could not detect Kafka
Streams version " + version + " " + str(node3.account))
+ first_monitor.wait_until("processed 100 records from
topic",
+ timeout_sec=60,
+ err_msg="Never saw output
'processed 100 records from topic' on" + str(node1.account))
+ second_monitor.wait_until("processed 100 records from
topic",
+ timeout_sec=60,
+ err_msg="Never saw output
'processed 100 records from topic' on" + str(node2.account))
+ third_monitor.wait_until("processed 100 records from
topic",
+ timeout_sec=60,
+ err_msg="Never saw output
'processed 100 records from topic' on" + str(node3.account))
+
+ @staticmethod
+ def prepare_for(processor, version):
+ processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT,
allow_fail=False)
+ processor.set_version(version)
+
+ def do_rolling_bounce(self, processor, upgrade_from, new_version, counter):
+ first_other_processor = None
+ second_other_processor = None
+ for p in self.processors:
+ if p != processor:
+ if first_other_processor is None:
+ first_other_processor = p
+ else:
+ second_other_processor = p
+
+ node = processor.node
+ first_other_node = first_other_processor.node
+ second_other_node = second_other_processor.node
+
+ # stop processor and wait for rebalance of others
+ with
first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as
first_other_monitor:
+ with
second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as
second_other_monitor:
+ processor.stop()
+ first_other_monitor.wait_until("processed 100 records from
topic",
+ timeout_sec=60,
+ err_msg="Never saw output
'processed 100 records from topic' on" + str(first_other_node.account))
+ second_other_monitor.wait_until("processed 100 records from
topic",
+ timeout_sec=60,
+ err_msg="Never saw output
'processed 100 records from topic' on" + str(second_other_node.account))
+ node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" %
processor.STDOUT_FILE, allow_fail=False)
+
+ if upgrade_from == "": # upgrade disabled -- second round of rolling
bounces
+ roll_counter = ".1-" # second round of rolling bounces
+ else:
+ roll_counter = ".0-" # first round of rolling boundes
+
+ node.account.ssh("mv " + processor.STDOUT_FILE + " " +
processor.STDOUT_FILE + roll_counter + str(counter), allow_fail=False)
+ node.account.ssh("mv " + processor.STDERR_FILE + " " +
processor.STDERR_FILE + roll_counter + str(counter), allow_fail=False)
+ node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE
+ roll_counter + str(counter), allow_fail=False)
+
+ if new_version == str(DEV_VERSION):
+ processor.set_version("") # set to TRUNK
+ else:
+ processor.set_version(new_version)
+ processor.set_upgrade_from(upgrade_from)
+
+ grep_metadata_error = "grep
\"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
subscription data: version=2\" "
+ with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+ with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+ with
first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as
first_other_monitor:
+ with
second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as
second_other_monitor:
+ processor.start()
+
+ log_monitor.wait_until("Kafka version : " +
new_version,
+ timeout_sec=60,
+ err_msg="Could not detect Kafka
Streams version " + new_version + " " + str(node.account))
+ first_other_monitor.wait_until("processed 100 records
from topic",
+ timeout_sec=60,
+ err_msg="Never saw
output 'processed 100 records from topic' on" + str(first_other_node.account))
+ found =
list(first_other_node.account.ssh_capture(grep_metadata_error +
first_other_processor.STDERR_FILE, allow_fail=True))
+ if len(found) > 0:
+ raise Exception("Kafka Streams failed with 'unable
to decode subscription data: version=2'")
+
+ second_other_monitor.wait_until("processed 100 records
from topic",
+ timeout_sec=60,
+ err_msg="Never saw
output 'processed 100 records from topic' on" + str(second_other_node.account))
+ found =
list(second_other_node.account.ssh_capture(grep_metadata_error +
second_other_processor.STDERR_FILE, allow_fail=True))
+ if len(found) > 0:
+ raise Exception("Kafka Streams failed with 'unable
to decode subscription data: version=2'")
+
+ monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output
'processed 100 records from topic' on" + str(node.account))
\ No newline at end of file
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 7cd489d..df95602 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -61,6 +61,7 @@ def get_version(node=None):
return DEV_BRANCH
DEV_BRANCH = KafkaVersion("dev")
+DEV_VERSION = KafkaVersion("0.10.2.2-SNAPSHOT")
# 0.8.2.X versions
V_0_8_2_1 = KafkaVersion("0.8.2.1")
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 0bb0f30..70987c6 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -52,6 +52,8 @@ get_kafka() {
kafka_dir=/opt/kafka-$version
url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_2.10-$version.tgz
+ # the .tgz above does not include the streams test jar hence we need to
get it separately
+
url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka-streams-$version-test.jar
if [ ! -d /opt/kafka-$version ]; then
pushd /tmp
curl -O $url
--
To stop receiving notification emails like this one, please contact
[email protected].