Repository: kafka Updated Branches: refs/heads/trunk 4a9e7607b -> 7d6515fb8
KAFKA-2741: Make SourceTaskContext and SinkTaskContext interfaces and keep implementations in runtime jar. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Guozhang Wang Closes #420 from ewencp/task-context-interfaces Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d6515fb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d6515fb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d6515fb Branch: refs/heads/trunk Commit: 7d6515fb8f6141f5c34fe8434e97ea6ebd65941f Parents: 4a9e760 Author: Ewen Cheslack-Postava <[email protected]> Authored: Thu Nov 5 08:44:44 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Nov 5 08:44:44 2015 -0800 ---------------------------------------------------------------------- .../kafka/copycat/sink/SinkTaskContext.java | 28 ++++------------ .../kafka/copycat/source/SourceTaskContext.java | 12 ++----- .../copycat/file/FileStreamSourceTaskTest.java | 5 ++- .../kafka/copycat/runtime/WorkerSinkTask.java | 20 ++++------- .../copycat/runtime/WorkerSinkTaskContext.java | 29 ++++++++++++++-- .../kafka/copycat/runtime/WorkerSourceTask.java | 3 +- .../runtime/WorkerSourceTaskContext.java | 35 ++++++++++++++++++++ .../copycat/runtime/WorkerSinkTaskTest.java | 7 ++-- 8 files changed, 87 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java index 399dcef..763b9a4 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java @@ -20,7 +20,6 @@ package org.apache.kafka.copycat.sink; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; -import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -28,14 +27,7 @@ import java.util.Set; * Context passed to SinkTasks, allowing them to access utilities in the copycat runtime. */ @InterfaceStability.Unstable -public abstract class SinkTaskContext { - protected Map<TopicPartition, Long> offsets; - protected long timeoutMs = -1L; - - public SinkTaskContext() { - offsets = new HashMap<>(); - } - +public interface SinkTaskContext { /** * Reset the consumer offsets for the given topic partitions. SinkTasks should use this if they manage offsets * in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record @@ -46,9 +38,7 @@ public abstract class SinkTaskContext { * * @param offsets map of offsets for topic partitions */ - public void offset(Map<TopicPartition, Long> offsets) { - this.offsets = offsets; - } + void offset(Map<TopicPartition, Long> offsets); /** * Reset the consumer offsets for the given topic partition. SinkTasks should use if they manage offsets @@ -61,9 +51,7 @@ public abstract class SinkTaskContext { * @param tp the topic partition to reset offset. * @param offset the offset to reset to. */ - public void offset(TopicPartition tp, long offset) { - offsets.put(tp, offset); - } + void offset(TopicPartition tp, long offset); /** * Set the timeout in milliseconds. SinkTasks should use this to indicate that they need to retry certain @@ -72,25 +60,23 @@ public abstract class SinkTaskContext { * issues. SinkTasks use this method to set how long to wait before retrying. * @param timeoutMs the backoff timeout in milliseconds. */ - public void timeout(long timeoutMs) { - this.timeoutMs = timeoutMs; - } + void timeout(long timeoutMs); /** * Get the current set of assigned TopicPartitions for this task. * @return the set of currently assigned TopicPartitions */ - public abstract Set<TopicPartition> assignment(); + Set<TopicPartition> assignment(); /** * Pause consumption of messages from the specified TopicPartitions. * @param partitions the partitions which should be paused */ - public abstract void pause(TopicPartition... partitions); + void pause(TopicPartition... partitions); /** * Resume consumption of messages from previously paused TopicPartitions. * @param partitions the partitions to resume */ - public abstract void resume(TopicPartition... partitions); + void resume(TopicPartition... partitions); } http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java index a3875e7..bc18c30 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java @@ -24,17 +24,9 @@ import org.apache.kafka.copycat.storage.OffsetStorageReader; * runtime. */ @InterfaceStability.Unstable -public class SourceTaskContext { - private final OffsetStorageReader reader; - - public SourceTaskContext(OffsetStorageReader reader) { - this.reader = reader; - } - +public interface SourceTaskContext { /** * Get the OffsetStorageReader for this SourceTask. */ - public OffsetStorageReader offsetStorageReader() { - return reader; - } + OffsetStorageReader offsetStorageReader(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java index d2781c9..4365def 100644 --- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java +++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java @@ -44,6 +44,7 @@ public class FileStreamSourceTaskTest { private File tempFile; private Properties config; private OffsetStorageReader offsetStorageReader; + private SourceTaskContext context; private FileStreamSourceTask task; private boolean verifyMocks = false; @@ -56,7 +57,8 @@ public class FileStreamSourceTaskTest { config.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC); task = new FileStreamSourceTask(); offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class); - task.initialize(new SourceTaskContext(offsetStorageReader)); + context = PowerMock.createMock(SourceTaskContext.class); + task.initialize(context); } @After @@ -142,6 +144,7 @@ public class FileStreamSourceTaskTest { private void expectOffsetLookupReturnNone() { + EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader); EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index e9aa055..439a1f5 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -77,8 +77,14 @@ class WorkerSinkTask implements WorkerTask { public void start(Properties props) { consumer = createConsumer(props); context = new WorkerSinkTaskContext(consumer); + + // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions + // to work with. Any rewinding will be handled immediately when polling starts. + consumer.poll(0); + task.initialize(context); task.start(props); + workThread = createWorkerThread(); workThread.start(); } @@ -207,18 +213,6 @@ class WorkerSinkTask implements WorkerTask { log.debug("Task {} subscribing to topics {}", id, topics); newConsumer.subscribe(Arrays.asList(topics), new HandleRebalance()); - // Seek to any user-provided offsets. This is useful if offsets are tracked in the downstream system (e.g., to - // enable exactly once delivery to that system). - // - // To do this correctly, we need to first make sure we have been assigned partitions, which poll() will guarantee. - // We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly. - newConsumer.poll(0); - Map<TopicPartition, Long> offsets = context.offsets(); - for (TopicPartition tp : newConsumer.assignment()) { - Long offset = offsets.get(tp); - if (offset != null) - newConsumer.seek(tp, offset); - } return newConsumer; } @@ -264,7 +258,7 @@ class WorkerSinkTask implements WorkerTask { consumer.seek(tp, offset); } } - offsets.clear(); + context.clearOffsets(); } private class HandleRebalance implements ConsumerRebalanceListener { http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java index b8d7d54..b474589 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java @@ -16,17 +16,35 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.copycat.errors.IllegalWorkerStateException; import org.apache.kafka.copycat.sink.SinkTaskContext; +import java.util.HashMap; import java.util.Map; import java.util.Set; -public class WorkerSinkTaskContext extends SinkTaskContext { - - KafkaConsumer<byte[], byte[]> consumer; +public class WorkerSinkTaskContext implements SinkTaskContext { + private Map<TopicPartition, Long> offsets; + private long timeoutMs; + private KafkaConsumer<byte[], byte[]> consumer; public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) { + this.offsets = new HashMap<>(); + this.timeoutMs = -1L; this.consumer = consumer; } + @Override + public void offset(Map<TopicPartition, Long> offsets) { + this.offsets.putAll(offsets); + } + + @Override + public void offset(TopicPartition tp, long offset) { + offsets.put(tp, offset); + } + + public void clearOffsets() { + offsets.clear(); + } + /** * Get offsets that the SinkTask has submitted to be reset. Used by the Copycat framework. * @return the map of offsets @@ -35,6 +53,11 @@ public class WorkerSinkTaskContext extends SinkTaskContext { return offsets; } + @Override + public void timeout(long timeoutMs) { + this.timeoutMs = timeoutMs; + } + /** * Get the timeout in milliseconds set by SinkTasks. Used by the Copycat framework. * @return the backoff timeout in milliseconds. http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java index 78b588c..9740933 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java @@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.copycat.source.SourceRecord; import org.apache.kafka.copycat.source.SourceTask; -import org.apache.kafka.copycat.source.SourceTaskContext; import org.apache.kafka.copycat.storage.Converter; import org.apache.kafka.copycat.storage.OffsetStorageReader; import org.apache.kafka.copycat.storage.OffsetStorageWriter; @@ -87,7 +86,7 @@ class WorkerSourceTask implements WorkerTask { @Override public void start(Properties props) { - task.initialize(new SourceTaskContext(offsetReader)); + task.initialize(new WorkerSourceTaskContext(offsetReader)); task.start(props); workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id); workThread.start(); http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskContext.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskContext.java new file mode 100644 index 0000000..f19e4e6 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskContext.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.copycat.source.SourceTaskContext; +import org.apache.kafka.copycat.storage.OffsetStorageReader; + +public class WorkerSourceTaskContext implements SourceTaskContext { + + private final OffsetStorageReader reader; + + public WorkerSourceTaskContext(OffsetStorageReader reader) { + this.reader = reader; + } + + @Override + public OffsetStorageReader offsetStorageReader() { + return reader; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java index 08707c9..acc1179 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java @@ -400,13 +400,16 @@ public class WorkerSinkTaskTest extends ThreadedTest { } private void expectInitializeTask(Properties taskProps) throws Exception { + PowerMock.expectPrivate(workerTask, "createConsumer", taskProps) + .andReturn(consumer); + + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.<byte[], byte[]>empty()); + sinkTask.initialize(EasyMock.capture(sinkTaskContext)); PowerMock.expectLastCall(); sinkTask.start(taskProps); PowerMock.expectLastCall(); - PowerMock.expectPrivate(workerTask, "createConsumer", taskProps) - .andReturn(consumer); workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start"}, workerTask, "mock-worker-thread", time, workerConfig);
