This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2119e9f Revert "KAFKA-6383: complete shutdown for CREATED
StreamThreads (#4343)"
2119e9f is described below
commit 2119e9f26ed988746ee22d139d1eb1aca2bbb950
Author: Guozhang Wang <[email protected]>
AuthorDate: Tue Jan 2 14:21:42 2018 -0800
Revert "KAFKA-6383: complete shutdown for CREATED StreamThreads (#4343)"
This reverts commit 47db063c310cf47e4c544196acab2abfe62977b0.
---
.../streams/processor/internals/StreamThread.java | 31 +++++-----------
.../streams/processor/internals/TaskManager.java | 6 +--
.../org/apache/kafka/streams/KafkaStreamsTest.java | 24 ------------
.../processor/internals/StreamThreadTest.java | 43 +---------------------
4 files changed, 13 insertions(+), 91 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ff440cc..696081d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -173,26 +173,23 @@ public class StreamThread extends Thread {
/**
* Sets the state
* @param newState New state
- * @return The state prior to the call to setState, or null if the
transition is invalid
*/
- State setState(final State newState) {
- final State oldState;
+ boolean setState(final State newState) {
+ final State oldState = state;
synchronized (stateLock) {
- oldState = state;
-
if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
// when the state is already in PENDING_SHUTDOWN, all other
transitions will be
// refused but we do not throw exception here
- return null;
+ return false;
} else if (state == State.DEAD) {
// when the state is already in NOT_RUNNING, all its
transitions
// will be refused but we do not throw exception here
- return null;
+ return false;
} else if (state == State.PARTITIONS_REVOKED && newState ==
State.PARTITIONS_REVOKED) {
// when the state is already in PARTITIONS_REVOKED, its
transition to itself will be
// refused but we do not throw exception here
- return null;
+ return false;
} else if (!state.isValidTransition(newState)) {
log.error("Unexpected state transition from {} to {}",
oldState, newState);
throw new StreamsException(logPrefix + "Unexpected state
transition from " + oldState + " to " + newState);
@@ -212,7 +209,7 @@ public class StreamThread extends Thread {
stateListener.onChange(this, state, oldState);
}
- return oldState;
+ return true;
}
public boolean isRunningAndNotRebalancing() {
@@ -254,7 +251,7 @@ public class StreamThread extends Thread {
final long start = time.milliseconds();
try {
- if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
+ if (!streamThread.setState(State.PARTITIONS_ASSIGNED)) {
return;
}
taskManager.createTasks(assignment);
@@ -284,7 +281,7 @@ public class StreamThread extends Thread {
taskManager.activeTaskIds(),
taskManager.standbyTaskIds());
- if (streamThread.setState(State.PARTITIONS_REVOKED) != null) {
+ if (streamThread.setState(State.PARTITIONS_REVOKED)) {
final long start = time.milliseconds();
try {
// suspend active tasks
@@ -717,11 +714,7 @@ public class StreamThread extends Thread {
@Override
public void run() {
log.info("Starting");
- if (setState(State.RUNNING) == null) {
- log.info("StreamThread already shutdown. Not running");
- completeShutdown(true);
- return;
- }
+ setState(State.RUNNING);
boolean cleanRun = false;
try {
runLoop();
@@ -1095,11 +1088,7 @@ public class StreamThread extends Thread {
*/
public void shutdown() {
log.info("Informed to shut down");
- State oldState = setState(State.PENDING_SHUTDOWN);
- if (oldState == State.CREATED) {
- // Start so that we shutdown on the thread
- this.start();
- }
+ setState(State.PENDING_SHUTDOWN);
}
private void completeShutdown(final boolean cleanRun) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index bdc1c00..d70c8f3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -274,11 +274,7 @@ class TaskManager {
standby.close(clean);
// remove the changelog partitions from restore consumer
- try {
- restoreConsumer.unsubscribe();
- } catch (final RuntimeException fatalException) {
- firstException.compareAndSet(null, fatalException);
- }
+ restoreConsumer.unsubscribe();
taskCreator.close();
standbyTaskCreator.close();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index a2084b0..8746c62 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams;
-import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
@@ -32,7 +31,6 @@ import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestCondition;
@@ -116,28 +114,6 @@ public class KafkaStreamsTest {
}
@Test
- public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws
Exception {
- final StreamsBuilder builder = new StreamsBuilder();
- builder.globalTable("anyTopic");
- MockClientSupplier clientSupplier = new MockClientSupplier();
- final KafkaStreams streams = new KafkaStreams(builder.build(), new
StreamsConfig(props), clientSupplier);
- streams.close();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return streams.state() == KafkaStreams.State.NOT_RUNNING;
- }
- }, 10 * 1000, "Streams never stopped.");
-
- // Ensure that any created clients are closed
- assertTrue(clientSupplier.consumer.closed());
- assertTrue(clientSupplier.restoreConsumer.closed());
- for (MockProducer p : clientSupplier.producers) {
- assertTrue(p.closed());
- }
- }
-
- @Test
public void testStateThreadClose() throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
// make sure we have the global state thread running too
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index cca7045..4250465 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -441,13 +441,8 @@ public class StreamThreadTest {
EasyMock.expectLastCall();
EasyMock.replay(taskManager, consumer);
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new
StreamThread.StreamsMetricsThreadImpl(
- metrics,
- "",
- "",
- Collections.<String, String>emptyMap());
- final StreamThread thread = new StreamThread(
- mockTime,
+ StreamThread.StreamsMetricsThreadImpl streamsMetrics = new
StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String,
String>emptyMap());
+ final StreamThread thread = new StreamThread(mockTime,
config,
consumer,
consumer,
@@ -464,40 +459,6 @@ public class StreamThreadTest {
}
@Test
- public void shouldShutdownTaskManagerOnCloseWithoutStart() {
- final Consumer<byte[], byte[]> consumer =
EasyMock.createNiceMock(Consumer.class);
- final TaskManager taskManager =
EasyMock.createNiceMock(TaskManager.class);
- taskManager.shutdown(true);
- EasyMock.expectLastCall();
- EasyMock.replay(taskManager, consumer);
-
- final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new
StreamThread.StreamsMetricsThreadImpl(
- metrics,
- "",
- "",
- Collections.<String, String>emptyMap());
- final StreamThread thread = new StreamThread(
- mockTime,
- config,
- consumer,
- consumer,
- null,
- taskManager,
- streamsMetrics,
- internalTopologyBuilder,
- clientId,
- new LogContext(""));
- thread.shutdown();
- try {
- thread.join(1000);
- } catch (final InterruptedException e) {
- fail("Join interrupted");
- }
- assertFalse(thread.isAlive());
- EasyMock.verify(taskManager);
- }
-
- @Test
public void
shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology()
throws InterruptedException {
internalTopologyBuilder.addSource(null, "name", null, null, null,
"topic");
internalTopologyBuilder.addSink("out", "output", null, null, null);
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].