Repository: kafka Updated Branches: refs/heads/trunk 9a836d015 -> 958e10c87
http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 8ff72bc..b3b6537 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -37,7 +37,7 @@ import java.util.Set; public abstract class AbstractTask { protected final TaskId id; - protected final String jobId; + protected final String applicationId; protected final ProcessorTopology topology; protected final Consumer consumer; protected final ProcessorStateManager stateMgr; @@ -45,7 +45,7 @@ public abstract class AbstractTask { protected ProcessorContext processorContext; protected AbstractTask(TaskId id, - String jobId, + String applicationId, Collection<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, @@ -53,17 +53,17 @@ public abstract class AbstractTask { StreamsConfig config, boolean isStandby) { this.id = id; - this.jobId = jobId; + this.applicationId = applicationId; this.partitions = new HashSet<>(partitions); this.topology = topology; this.consumer = consumer; // create the processor state manager try { - File jobStateDir = StreamThread.makeStateDir(jobId, config.getString(StreamsConfig.STATE_DIR_CONFIG)); - File stateFile = new File(jobStateDir.getCanonicalPath(), id.toString()); + File applicationStateDir = StreamThread.makeStateDir(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG)); + File stateFile = new File(applicationStateDir.getCanonicalPath(), id.toString()); // if partitions is null, this is a standby task - this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby); + this.stateMgr = new ProcessorStateManager(applicationId, id.partition, partitions, stateFile, restoreConsumer, isStandby); } catch (IOException e) { throw new ProcessorStateException("Error while creating the state manager", e); } @@ -83,8 +83,8 @@ public abstract class AbstractTask { return id; } - public final String jobId() { - return jobId; + public final String applicationId() { + return applicationId; } public final Set<TopicPartition> partitions() { http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index c4acc01..f6e43d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -27,15 +27,10 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier { - private static final Logger log = LoggerFactory.getLogger(ProcessorContextImpl.class); - private final TaskId id; private final StreamTask task; private final StreamsMetrics metrics; @@ -84,8 +79,8 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S } @Override - public String jobId() { - return task.jobId(); + public String applicationId() { + return task.applicationId(); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index c8f289e..df8516c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -51,7 +51,7 @@ public class ProcessorStateManager { public static final String CHECKPOINT_FILE_NAME = ".checkpoint"; public static final String LOCK_FILE_NAME = ".lock"; - private final String jobId; + private final String applicationId; private final int defaultPartition; private final Map<String, TopicPartition> partitionForTopic; private final File baseDir; @@ -65,8 +65,8 @@ public class ProcessorStateManager { private final boolean isStandby; private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name - public ProcessorStateManager(String jobId, int defaultPartition, Collection<TopicPartition> sources, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException { - this.jobId = jobId; + public ProcessorStateManager(String applicationId, int defaultPartition, Collection<TopicPartition> sources, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException { + this.applicationId = applicationId; this.defaultPartition = defaultPartition; this.partitionForTopic = new HashMap<>(); for (TopicPartition source : sources) { @@ -104,8 +104,8 @@ public class ProcessorStateManager { } } - public static String storeChangelogTopic(String jobId, String storeName) { - return jobId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX; + public static String storeChangelogTopic(String applicationId, String storeName) { + return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX; } public static FileLock lockStateDirectory(File stateDir) throws IOException { @@ -154,7 +154,7 @@ public class ProcessorStateManager { // check that the underlying change log topic exist or not String topic; if (loggingEnabled) - topic = storeChangelogTopic(this.jobId, store.name()); + topic = storeChangelogTopic(this.applicationId, store.name()); else topic = store.name(); // block until the partition is ready for this state changelog topic or time has elapsed @@ -325,7 +325,7 @@ public class ProcessorStateManager { for (String storeName : stores.keySet()) { TopicPartition part; if (loggingEnabled.contains(storeName)) - part = new TopicPartition(storeChangelogTopic(jobId, storeName), getPartition(storeName)); + part = new TopicPartition(storeChangelogTopic(applicationId, storeName), getPartition(storeName)); else part = new TopicPartition(storeName, getPartition(storeName)); http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 82633b4..0bcae18 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -25,17 +25,13 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; public class StandbyContextImpl implements ProcessorContext, RecordCollector.Supplier { - private static final Logger log = LoggerFactory.getLogger(StandbyContextImpl.class); - private final TaskId id; - private final String jobId; + private final String applicationId; private final StreamsMetrics metrics; private final ProcessorStateManager stateMgr; @@ -47,12 +43,12 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup private boolean initialized; public StandbyContextImpl(TaskId id, - String jobId, + String applicationId, StreamsConfig config, ProcessorStateManager stateMgr, StreamsMetrics metrics) { this.id = id; - this.jobId = jobId; + this.applicationId = applicationId; this.metrics = metrics; this.stateMgr = stateMgr; @@ -78,8 +74,8 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup } @Override - public String jobId() { - return jobId; + public String applicationId() { + return applicationId; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index da454cb..f19d5a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -23,8 +23,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Collections; @@ -36,15 +34,13 @@ import java.util.Map; */ public class StandbyTask extends AbstractTask { - private static final Logger log = LoggerFactory.getLogger(StandbyTask.class); - private final Map<TopicPartition, Long> checkpointedOffsets; /** * Create {@link StandbyTask} with its assigned partitions * * @param id the ID of this task - * @param jobId the ID of the job + * @param applicationId the ID of the stream processing application * @param partitions the collection of assigned {@link TopicPartition} * @param topology the instance of {@link ProcessorTopology} * @param consumer the instance of {@link Consumer} @@ -53,17 +49,17 @@ public class StandbyTask extends AbstractTask { * @param metrics the {@link StreamsMetrics} created by the thread */ public StandbyTask(TaskId id, - String jobId, + String applicationId, Collection<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, Consumer<byte[], byte[]> restoreConsumer, StreamsConfig config, StreamsMetrics metrics) { - super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true); + super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, true); // initialize the topology with its own context - this.processorContext = new StandbyContextImpl(id, jobId, config, stateMgr, metrics); + this.processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics); initializeStateStores(); http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 266df3e..a6b82af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -117,7 +117,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable streamThread = (StreamThread) o; streamThread.partitionAssignor(this); - this.topicGroups = streamThread.builder.topicGroups(streamThread.jobId); + this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId); if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) { internalTopicManager = new InternalTopicManager( @@ -445,7 +445,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable /* For Test Only */ public Set<TaskId> tasksForState(String stateName) { - return stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.jobId, stateName)); + return stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.applicationId, stateName)); } public Set<TaskId> tasksForPartition(TopicPartition partition) { http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 4d66324..54a25c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -61,7 +61,7 @@ public class StreamTask extends AbstractTask implements Punctuator { * Create {@link StreamTask} with its assigned partitions * * @param id the ID of this task - * @param jobId the ID of the job + * @param applicationId the ID of the stream processing application * @param partitions the collection of assigned {@link TopicPartition} * @param topology the instance of {@link ProcessorTopology} * @param consumer the instance of {@link Consumer} @@ -71,7 +71,7 @@ public class StreamTask extends AbstractTask implements Punctuator { * @param metrics the {@link StreamsMetrics} created by the thread */ public StreamTask(TaskId id, - String jobId, + String applicationId, Collection<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, @@ -79,7 +79,7 @@ public class StreamTask extends AbstractTask implements Punctuator { Consumer<byte[], byte[]> restoreConsumer, StreamsConfig config, StreamsMetrics metrics) { - super(id, jobId, partitions, topology, consumer, restoreConsumer, config, false); + super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, false); this.punctuationQueue = new PunctuationQueue(); this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index e9343e0..491c812 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -72,7 +72,7 @@ public class StreamThread extends Thread { private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1); public final PartitionGrouper partitionGrouper; - public final String jobId; + public final String applicationId; public final String clientId; public final UUID processId; @@ -106,12 +106,12 @@ public class StreamThread extends Thread { private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords; private boolean processStandbyRecords = false; - static File makeStateDir(String jobId, String baseDirName) { + static File makeStateDir(String applicationId, String baseDirName) { File baseDir = new File(baseDirName); if (!baseDir.exists()) baseDir.mkdir(); - File stateDir = new File(baseDir, jobId); + File stateDir = new File(baseDir, applicationId); if (!stateDir.exists()) stateDir.mkdir(); @@ -150,12 +150,12 @@ public class StreamThread extends Thread { public StreamThread(TopologyBuilder builder, StreamsConfig config, - String jobId, + String applicationId, String clientId, UUID processId, Metrics metrics, Time time) { - this(builder, config, null , null, null, jobId, clientId, processId, metrics, time); + this(builder, config, null , null, null, applicationId, clientId, processId, metrics, time); } StreamThread(TopologyBuilder builder, @@ -163,17 +163,17 @@ public class StreamThread extends Thread { Producer<byte[], byte[]> producer, Consumer<byte[], byte[]> consumer, Consumer<byte[], byte[]> restoreConsumer, - String jobId, + String applicationId, String clientId, UUID processId, Metrics metrics, Time time) { super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement()); - this.jobId = jobId; + this.applicationId = applicationId; this.config = config; this.builder = builder; - this.sourceTopics = builder.sourceTopics(jobId); + this.sourceTopics = builder.sourceTopics(applicationId); this.clientId = clientId; this.processId = processId; this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class); @@ -194,7 +194,7 @@ public class StreamThread extends Thread { this.standbyRecords = new HashMap<>(); // read in task specific config values - this.stateDir = makeStateDir(this.jobId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG)); + this.stateDir = makeStateDir(this.applicationId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG)); this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG); this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); @@ -224,7 +224,7 @@ public class StreamThread extends Thread { private Consumer<byte[], byte[]> createConsumer() { String threadName = this.getName(); log.info("Creating consumer client for stream thread [" + threadName + "]"); - return new KafkaConsumer<>(config.getConsumerConfigs(this, this.jobId, this.clientId + "-" + threadName), + return new KafkaConsumer<>(config.getConsumerConfigs(this, this.applicationId, this.clientId + "-" + threadName), new ByteArrayDeserializer(), new ByteArrayDeserializer()); } @@ -580,9 +580,9 @@ public class StreamThread extends Thread { protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) { sensors.taskCreationSensor.record(); - ProcessorTopology topology = builder.build(jobId, id.topicGroupId); + ProcessorTopology topology = builder.build(applicationId, id.topicGroupId); - return new StreamTask(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, sensors); + return new StreamTask(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, sensors); } private void addStreamTasks(Collection<TopicPartition> assignment) { @@ -650,10 +650,10 @@ public class StreamThread extends Thread { protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) { sensors.taskCreationSensor.record(); - ProcessorTopology topology = builder.build(jobId, id.topicGroupId); + ProcessorTopology topology = builder.build(applicationId, id.topicGroupId); if (!topology.stateStoreSuppliers().isEmpty()) { - return new StandbyTask(id, jobId, partitions, topology, consumer, restoreConsumer, config, sensors); + return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, sensors); } else { return null; } http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java index aac4d85..4229f94 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java @@ -57,7 +57,7 @@ public class StoreChangeLogger<K, V> { } protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) { - this.topic = ProcessorStateManager.storeChangelogTopic(context.jobId(), storeName); + this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); this.context = context; this.partition = partition; this.serialization = serialization; http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index f0276ab..83ebe48 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -41,7 +41,7 @@ public class StreamsConfigTest { @Before public void setUp() { - props.put(StreamsConfig.JOB_ID_CONFIG, "streams-config-test"); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); @@ -59,9 +59,10 @@ public class StreamsConfigTest { @Test public void testGetConsumerConfigs() throws Exception { - Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client"); + Map<String, Object> returnedProps = + streamsConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-application", "client"); assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer"); - assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job"); + assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application"); } http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 14cb493..1d0a969 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -183,12 +183,12 @@ public class ProcessorStateManagerTest { } private final Set<TopicPartition> noPartitions = Collections.emptySet(); - private final String jobId = "test-job"; + private final String applicationId = "test-application"; private final String stateDir = "test"; private final String persistentStoreName = "persistentStore"; private final String nonPersistentStoreName = "nonPersistentStore"; - private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(jobId, persistentStoreName); - private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(jobId, nonPersistentStoreName); + private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName); + private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName); @Test public void testLockStateDirectory() throws IOException { @@ -197,7 +197,7 @@ public class ProcessorStateManagerTest { FileLock lock; // the state manager locks the directory - ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false); try { // this should not get the lock @@ -226,7 +226,7 @@ public class ProcessorStateManagerTest { try { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); } finally { @@ -258,7 +258,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, noPartitions, baseDir, restoreConsumer, false); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 2, noPartitions, baseDir, restoreConsumer, false); try { restoreConsumer.reset(); @@ -311,7 +311,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, noPartitions, baseDir, restoreConsumer, false); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 2, noPartitions, baseDir, restoreConsumer, false); try { restoreConsumer.reset(); @@ -351,9 +351,9 @@ public class ProcessorStateManagerTest { String storeName2 = "store2"; String storeName3 = "store3"; - String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId, storeName1); - String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId, storeName2); - String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(jobId, storeName3); + String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1); + String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2); + String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3); OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset)); @@ -386,7 +386,7 @@ public class ProcessorStateManagerTest { // if there is an source partition, inherit the partition id Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1)); - ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 0, sourcePartitions, baseDir, restoreConsumer, true); // standby + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 0, sourcePartitions, baseDir, restoreConsumer, true); // standby try { restoreConsumer.reset(); @@ -425,7 +425,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, restoreConsumer, false); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, restoreConsumer, false); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); @@ -462,12 +462,12 @@ public class ProcessorStateManagerTest { HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>(); ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L); ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L); - ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(jobId, "otherTopic"), 1), 789L); + ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L); MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, restoreConsumer, false); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, restoreConsumer, false); try { // make sure the checkpoint file is deleted assertFalse(checkpointFile.exists()); http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index c8115b8..12210cc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -67,7 +67,7 @@ public class ProcessorTopologyTest { // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ... File localState = StateTestUtils.tempDir(); Properties props = new Properties(); - props.setProperty(StreamsConfig.JOB_ID_CONFIG, "processor-topology-test"); + props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath()); props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 295f0dd..21bdaff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -57,11 +57,11 @@ public class StandbyTaskTest { private final Serializer<Integer> intSerializer = new IntegerSerializer(); - private final String jobId = "test-job"; + private final String applicationId = "test-application"; private final String storeName1 = "store1"; private final String storeName2 = "store2"; - private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId, storeName1); - private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId, storeName2); + private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1); + private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2); private final TopicPartition partition1 = new TopicPartition(storeChangelogTopicName1, 1); private final TopicPartition partition2 = new TopicPartition(storeChangelogTopicName2, 1); @@ -94,7 +94,7 @@ public class StandbyTaskTest { setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); - setProperty(StreamsConfig.JOB_ID_CONFIG, jobId); + setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); @@ -133,7 +133,7 @@ public class StandbyTaskTest { File baseDir = Files.createTempDirectory("test").toFile(); try { StreamsConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); + StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions())); @@ -148,7 +148,7 @@ public class StandbyTaskTest { File baseDir = Files.createTempDirectory("test").toFile(); try { StreamsConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); + StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); @@ -167,7 +167,7 @@ public class StandbyTaskTest { File baseDir = Files.createTempDirectory("test").toFile(); try { StreamsConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); + StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); @@ -201,7 +201,7 @@ public class StandbyTaskTest { task.close(); - File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString()); + File taskDir = new File(StreamThread.makeStateDir(applicationId, baseDir.getCanonicalPath()), taskId.toString()); OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); Map<TopicPartition, Long> offsets = checkpoint.read(); @@ -230,7 +230,7 @@ public class StandbyTaskTest { )); StreamsConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, jobId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null); + StandbyTask task = new StandbyTask(taskId, applicationId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); @@ -299,7 +299,7 @@ public class StandbyTaskTest { task.close(); - File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString()); + File taskDir = new File(StreamThread.makeStateDir(applicationId, baseDir.getCanonicalPath()), taskId.toString()); OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); Map<TopicPartition, Long> offsets = checkpoint.read(); http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 7f37bda..a5990bd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -94,7 +94,7 @@ public class StreamPartitionAssignorTest { setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); - setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-partition-assignor-test"); + setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test"); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); } http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 1f401db..f2ade6b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -78,7 +78,7 @@ public class StreamTaskTest { setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); - setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-task-test"); + setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-task-test"); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); @@ -105,7 +105,7 @@ public class StreamTaskTest { File baseDir = Files.createTempDirectory("test").toFile(); try { StreamsConfig config = createConfig(baseDir); - StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null); + StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null); task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), @@ -156,7 +156,7 @@ public class StreamTaskTest { File baseDir = Files.createTempDirectory("test").toFile(); try { StreamsConfig config = createConfig(baseDir); - StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null); + StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null); task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index eaaf842..b201c07 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -60,7 +60,7 @@ import java.util.UUID; public class StreamThreadTest { private final String clientId = "clientId"; - private final String jobId = "stream-thread-test"; + private final String applicationId = "stream-thread-test"; private final UUID processId = UUID.randomUUID(); private TopicPartition t1p1 = new TopicPartition("topic1", 1); @@ -118,7 +118,7 @@ public class StreamThreadTest { setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); - setProperty(StreamsConfig.JOB_ID_CONFIG, jobId); + setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); } @@ -129,14 +129,14 @@ public class StreamThreadTest { public boolean committed = false; public TestStreamTask(TaskId id, - String jobId, + String applicationId, Collection<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, Producer<byte[], byte[]> producer, Consumer<byte[], byte[]> restoreConsumer, StreamsConfig config) { - super(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, null); + super(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, null); } @Override @@ -168,11 +168,11 @@ public class StreamThreadTest { builder.addSource("source3", "topic3"); builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3"); - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), new SystemTime()) { + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, applicationId, clientId, processId, new Metrics(), new SystemTime()) { @Override protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) { ProcessorTopology topology = builder.build("X", id.topicGroupId); - return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); + return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); } }; @@ -271,12 +271,12 @@ public class StreamThreadTest { StreamsConfig config = new StreamsConfig(props); - File jobDir = new File(baseDir, jobId); - jobDir.mkdir(); - File stateDir1 = new File(jobDir, task1.toString()); - File stateDir2 = new File(jobDir, task2.toString()); - File stateDir3 = new File(jobDir, task3.toString()); - File extraDir = new File(jobDir, "X"); + File applicationDir = new File(baseDir, applicationId); + applicationDir.mkdir(); + File stateDir1 = new File(applicationDir, task1.toString()); + File stateDir2 = new File(applicationDir, task2.toString()); + File stateDir3 = new File(applicationDir, task3.toString()); + File extraDir = new File(applicationDir, "X"); stateDir1.mkdir(); stateDir2.mkdir(); stateDir3.mkdir(); @@ -290,7 +290,7 @@ public class StreamThreadTest { TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), mockTime) { + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, applicationId, clientId, processId, new Metrics(), mockTime) { @Override public void maybeClean() { super.maybeClean(); @@ -299,7 +299,7 @@ public class StreamThreadTest { @Override protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) { ProcessorTopology topology = builder.build("X", id.topicGroupId); - return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); + return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); } }; @@ -412,7 +412,7 @@ public class StreamThreadTest { TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), mockTime) { + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, applicationId, clientId, processId, new Metrics(), mockTime) { @Override public void maybeCommit() { super.maybeCommit(); @@ -421,7 +421,7 @@ public class StreamThreadTest { @Override protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) { ProcessorTopology topology = builder.build("X", id.topicGroupId); - return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); + return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); } }; @@ -482,7 +482,7 @@ public class StreamThreadTest { StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread, thread.jobId, thread.clientId)); + partitionAssignor.configure(config.getConsumerConfigs(thread, thread.applicationId, thread.clientId)); Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription)); http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java index 6cb45f3..063eafe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java @@ -81,7 +81,7 @@ public class SmokeTestClient extends SmokeTestUtil { private static KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper) { Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "SmokeTest"); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest"); props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index d597fd2..b463669 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -106,8 +106,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S } @Override - public String jobId() { - return "mockJob"; + public String applicationId() { + return "mockApplication"; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index cf17dbe..a2948a2 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -126,7 +126,7 @@ public class ProcessorTopologyTestDriver { private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer(); - private final String jobId = "test-driver-job"; + private final String applicationId = "test-driver-application"; private final TaskId id; private final ProcessorTopology topology; @@ -167,7 +167,7 @@ public class ProcessorTopologyTestDriver { } task = new StreamTask(id, - jobId, + applicationId, partitionsByTopic.values(), topology, consumer, @@ -334,7 +334,7 @@ public class ProcessorTopologyTestDriver { }; // For each store name ... for (String storeName : storeNames) { - String topicName = ProcessorStateManager.storeChangelogTopic(jobId, storeName); + String topicName = ProcessorStateManager.storeChangelogTopic(applicationId, storeName); // Set up the restore-state topic ... // consumer.subscribe(new TopicPartition(topicName, 1)); // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
