Repository: kafka Updated Branches: refs/heads/trunk a0ca8f642 -> 23f9afb70
KAFKA-2482: Allow sink tasks to get their current assignment, as well as pause and resume topic partitions. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Guozhang Wang Closes #249 from ewencp/kafka-2482-sink-tasks-pause-consumption Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/23f9afb7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/23f9afb7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/23f9afb7 Branch: refs/heads/trunk Commit: 23f9afb70bc5cdbf66550a1e69161e2fe06a909a Parents: a0ca8f6 Author: Ewen Cheslack-Postava <[email protected]> Authored: Tue Oct 6 14:26:08 2015 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Oct 6 14:26:08 2015 -0700 ---------------------------------------------------------------------- .../errors/IllegalWorkerStateException.java | 35 ++++++ .../kafka/copycat/sink/SinkTaskContext.java | 19 +++ .../copycat/runtime/SinkTaskContextImpl.java | 24 ---- .../kafka/copycat/runtime/WorkerSinkTask.java | 35 +++++- .../copycat/runtime/WorkerSinkTaskTest.java | 118 ++++++++++++++++++- 5 files changed, 201 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java new file mode 100644 index 0000000..6f9f233 --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.errors; + +/** + * Indicates that a method has been invoked illegally or at an invalid time by a connector or task. + */ +public class IllegalWorkerStateException extends CopycatException { + public IllegalWorkerStateException(String s) { + super(s); + } + + public IllegalWorkerStateException(String s, Throwable throwable) { + super(s, throwable); + } + + public IllegalWorkerStateException(Throwable throwable) { + super(throwable); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java index 67c045f..3ecff27 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; import java.util.HashMap; import java.util.Map; +import java.util.Set; /** * Context passed to SinkTasks, allowing them to access utilities in the copycat runtime. @@ -56,4 +57,22 @@ public abstract class SinkTaskContext { public Map<TopicPartition, Long> offsets() { return offsets; } + + /** + * Get the current set of assigned TopicPartitions for this task. + * @return the set of currently assigned TopicPartitions + */ + public abstract Set<TopicPartition> assignment(); + + /** + * Pause consumption of messages from the specified TopicPartitions. + * @param partitions the partitions which should be paused + */ + public abstract void pause(TopicPartition... partitions); + + /** + * Resume consumption of messages from previously paused TopicPartitions. + * @param partitions the partitions to resume + */ + public abstract void resume(TopicPartition... partitions); } http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java deleted file mode 100644 index f47c984..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.copycat.sink.SinkTaskContext; - -class SinkTaskContextImpl extends SinkTaskContext { - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index cbda201..edb415a 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.copycat.cli.WorkerConfig; import org.apache.kafka.copycat.data.SchemaAndValue; import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.errors.IllegalWorkerStateException; import org.apache.kafka.copycat.sink.SinkRecord; import org.apache.kafka.copycat.sink.SinkTask; import org.apache.kafka.copycat.sink.SinkTaskContext; @@ -59,8 +60,8 @@ class WorkerSinkTask implements WorkerTask { this.workerConfig = workerConfig; this.keyConverter = keyConverter; this.valueConverter = valueConverter; - context = new SinkTaskContextImpl(); this.time = time; + this.context = new WorkerSinkTaskContext(); } @Override @@ -234,4 +235,36 @@ class WorkerSinkTask implements WorkerTask { } } } + + + private class WorkerSinkTaskContext extends SinkTaskContext { + @Override + public Set<TopicPartition> assignment() { + if (consumer == null) + throw new IllegalWorkerStateException("SinkTaskContext may not be used to look up partition assignment until the task is initialized"); + return consumer.assignment(); + } + + @Override + public void pause(TopicPartition... partitions) { + if (consumer == null) + throw new IllegalWorkerStateException("SinkTaskContext may not be used to pause consumption until the task is initialized"); + try { + consumer.pause(partitions); + } catch (IllegalStateException e) { + throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e); + } + } + + @Override + public void resume(TopicPartition... partitions) { + if (consumer == null) + throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized"); + try { + consumer.resume(partitions); + } catch (IllegalStateException e) { + throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e); + } + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java index 687ed8f..e4d1d8e 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.copycat.cli.WorkerConfig; import org.apache.kafka.copycat.data.Schema; import org.apache.kafka.copycat.data.SchemaAndValue; +import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.sink.SinkRecord; import org.apache.kafka.copycat.sink.SinkTask; import org.apache.kafka.copycat.sink.SinkTaskContext; @@ -40,9 +41,15 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) @PrepareForTest(WorkerSinkTask.class) @@ -53,6 +60,8 @@ public class WorkerSinkTaskTest extends ThreadedTest { // with mix of integer/string in Copycat private static final String TOPIC = "test"; private static final int PARTITION = 12; + private static final int PARTITION2 = 13; + private static final int PARTITION3 = 14; private static final long FIRST_OFFSET = 45; private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA; private static final int KEY = 12; @@ -62,10 +71,14 @@ public class WorkerSinkTaskTest extends ThreadedTest { private static final byte[] RAW_VALUE = "value".getBytes(); private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION); + private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2); + private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3); + private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200); private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private Time time; @Mock private SinkTask sinkTask; + private Capture<SinkTaskContext> sinkTaskContext = EasyMock.newCapture(); private WorkerConfig workerConfig; @Mock private Converter keyConverter; @Mock @@ -266,9 +279,79 @@ public class WorkerSinkTaskTest extends ThreadedTest { PowerMock.verifyAll(); } - private KafkaConsumer<byte[], byte[]> expectInitializeTask(Properties taskProps) - throws Exception { - sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class)); + @Test + public void testAssignmentPauseResume() throws Exception { + // Just validate that the calls are passed through to the consumer, and that where appropriate errors are + // converted + + Properties taskProps = new Properties(); + + expectInitializeTask(taskProps); + + expectOnePoll().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + assertEquals(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)), + sinkTaskContext.getValue().assignment()); + return null; + } + }); + EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3))); + + expectOnePoll().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + try { + sinkTaskContext.getValue().pause(UNASSIGNED_TOPIC_PARTITION); + fail("Trying to pause unassigned partition should have thrown an Copycat exception"); + } catch (CopycatException e) { + // expected + } + sinkTaskContext.getValue().pause(TOPIC_PARTITION, TOPIC_PARTITION2); + return null; + } + }); + consumer.pause(UNASSIGNED_TOPIC_PARTITION); + PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition")); + consumer.pause(TOPIC_PARTITION, TOPIC_PARTITION2); + PowerMock.expectLastCall(); + + expectOnePoll().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + try { + sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION); + fail("Trying to resume unassigned partition should have thrown an Copycat exception"); + } catch (CopycatException e) { + // expected + } + + sinkTaskContext.getValue().resume(TOPIC_PARTITION, TOPIC_PARTITION2); + return null; + } + }); + consumer.resume(UNASSIGNED_TOPIC_PARTITION); + PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition")); + consumer.resume(TOPIC_PARTITION, TOPIC_PARTITION2); + PowerMock.expectLastCall(); + + expectStopTask(0); + + PowerMock.replayAll(); + + workerTask.start(taskProps); + workerThread.iteration(); + workerThread.iteration(); + workerThread.iteration(); + workerTask.stop(); + workerTask.close(); + + PowerMock.verifyAll(); + } + + + private void expectInitializeTask(Properties taskProps) throws Exception { + sinkTask.initialize(EasyMock.capture(sinkTaskContext)); PowerMock.expectLastCall(); sinkTask.start(taskProps); PowerMock.expectLastCall(); @@ -282,7 +365,6 @@ public class WorkerSinkTaskTest extends ThreadedTest { .andReturn(workerThread); workerThread.start(); PowerMock.expectLastCall(); - return consumer; } private void expectStopTask(final long expectedMessages) throws Exception { @@ -328,6 +410,32 @@ public class WorkerSinkTaskTest extends ThreadedTest { return capturedRecords; } + private IExpectationSetters<Object> expectOnePoll() { + // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of + // returning empty data, we return one record. The expectation is that the data will be ignored by the + // response behavior specified using the return value of this method. + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + new IAnswer<ConsumerRecords<byte[], byte[]>>() { + @Override + public ConsumerRecords<byte[], byte[]> answer() throws Throwable { + // "Sleep" so time will progress + time.sleep(1L); + ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( + Collections.singletonMap( + new TopicPartition(TOPIC, PARTITION), + Arrays.asList( + new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE) + ))); + recordsReturned++; + return records; + } + }); + EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)); + EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)); + sinkTask.put(EasyMock.anyObject(Collection.class)); + return EasyMock.expectLastCall(); + } + private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages, final RuntimeException flushError, final Exception consumerCommitError,
