Repository: samza Updated Branches: refs/heads/master 97a3be397 -> 944dd02e1
SAMZA-979 - Remove KafkaCheckpointMigration Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/74b0f840 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/74b0f840 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/74b0f840 Branch: refs/heads/master Commit: 74b0f8405a73fedda7a0b4b517f3946027d15626 Parents: 97a3be3 Author: Xinyu Liu <xi...@linkedin.com> Authored: Fri Jul 29 12:58:13 2016 -0700 Committer: Navina Ramesh <nram...@linkedin.com> Committed: Fri Jul 29 12:58:23 2016 -0700 ---------------------------------------------------------------------- .../apache/samza/checkpoint/OffsetManager.scala | 1 - .../scala/org/apache/samza/job/JobRunner.scala | 4 - .../samza/migration/JobRunnerMigration.scala | 58 ----- .../org/apache/samza/job/TestJobRunner.scala | 20 +- .../migration/KafkaCheckpointMigration.scala | 149 ------------ .../TestKafkaCheckpointMigration.scala | 243 ------------------- 6 files changed, 1 insertion(+), 474 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index 7245902..f8033c5 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -223,7 +223,6 @@ class OffsetManager( } } - partitionOffsets.foreach(p => info("task " + taskName + " checkpoint " + p._1 + ", " + p._2)) checkpointManager.writeCheckpoint(taskName, new Checkpoint(partitionOffsets)) Option(lastProcessedOffsets.get(taskName)) match { case Some(sspToOffsets) => sspToOffsets.foreach { case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) } http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index a3613ff..383bb13 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -24,7 +24,6 @@ import org.apache.samza.config.{ConfigRewriter, Config} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig} import org.apache.samza.job.ApplicationStatus.Running -import org.apache.samza.migration.JobRunnerMigration import org.apache.samza.util.CommandLine import org.apache.samza.util.Logging import org.apache.samza.util.Util @@ -122,9 +121,6 @@ class JobRunner(config: Config) extends Logging { } coordinatorSystemProducer.stop - // Perform any migration plan to run in job runner - JobRunnerMigration(config) - // Create the actual job, and submit it. val job = jobFactory.getJob(config).submit http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala b/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala deleted file mode 100644 index f38b87a..0000000 --- a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.migration - -import org.apache.samza.config.Config -import org.apache.samza.util.{Util, Logging} -import org.apache.samza.SamzaException - - -object JobRunnerMigration { - val CHECKPOINT_MIGRATION = "org.apache.samza.migration.KafkaCheckpointMigration" - val UNSUPPORTED_ERROR_MSG = "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " + - "for everything else, please use the checkpoint tool to migrate the taskname-to-changelog mapping, and add " + - "task.checkpoint.skip-migration=true to your configs." - def apply(config: Config) = { - val migration = new JobRunnerMigration - migration.checkpointMigration(config) - } -} - -class JobRunnerMigration extends Logging { - - def checkpointMigration(config: Config) = { - val checkpointFactory = Option(config.get("task.checkpoint.factory")) - checkpointFactory match { - case Some("org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory") => - info("Performing checkpoint migration") - val checkpointMigrationPlan = Util.getObj[MigrationPlan](JobRunnerMigration.CHECKPOINT_MIGRATION) - checkpointMigrationPlan.migrate(config) - case None => - info("No task.checkpoint.factory defined, not performing any checkpoint migration") - case _ => - val skipMigration = config.getBoolean("task.checkpoint.skip-migration", false) - if (skipMigration) { - info("Job is configured to skip any checkpoint migration.") - } else { - error(JobRunnerMigration.UNSUPPORTED_ERROR_MSG) - throw new SamzaException(JobRunnerMigration.UNSUPPORTED_ERROR_MSG) - } - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala index e97656a..17c5297 100644 --- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala +++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala @@ -21,13 +21,11 @@ package org.apache.samza.job import java.io.File -import org.apache.samza.SamzaException import org.apache.samza.config.Config import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory -import org.apache.samza.migration.JobRunnerMigration -import org.junit.Test import org.junit.After import org.junit.Assert._ +import org.junit.Test object TestJobRunner { var processCount = 0 @@ -41,22 +39,6 @@ class TestJobRunner { } @Test - def testJobRunnerMigrationFails { - MockCoordinatorStreamSystemFactory.enableMockConsumerCache() - - try { - JobRunner.main(Array( - "--config-factory", - "org.apache.samza.config.factories.PropertiesConfigFactory", - "--config-path", - "file://%s/src/test/resources/test-migration-fail.properties" format new File(".").getCanonicalPath)) - fail("Should have failed already.") - } catch { - case se: SamzaException => assertEquals(se.getMessage, JobRunnerMigration.UNSUPPORTED_ERROR_MSG) - } - } - - @Test def testJobRunnerWorks { MockCoordinatorStreamSystemFactory.enableMockConsumerCache() http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala b/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala deleted file mode 100644 index 5d2641a..0000000 --- a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.migration - -import kafka.utils.ZKStringSerializer -import org.I0Itec.zkclient.ZkClient -import org.apache.samza.SamzaException -import org.apache.samza.checkpoint.kafka.{KafkaCheckpointManager, KafkaCheckpointManagerFactory} -import org.apache.samza.config.Config -import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config.KafkaConfig.Config2Kafka -import org.apache.samza.coordinator.stream.messages.{CoordinatorStreamMessage, SetMigrationMetaMessage} -import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemFactory, CoordinatorStreamSystemProducer} -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.storage.ChangelogPartitionManager -import org.apache.samza.util._ - -/** - * Migrates changelog partition mapping from checkpoint topic to coordinator stream - */ -class KafkaCheckpointMigration extends MigrationPlan with Logging { - val source = "CHECKPOINTMIGRATION" - val migrationKey = "CheckpointMigration09to10" - val migrationVal = "true" - - var connectZk: () => ZkClient = null - - private def getCheckpointSystemName(config: Config): String = { - config - .getCheckpointSystem - .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager.")) - } - - private def getClientId(config: Config): String = { - KafkaUtil.getClientId("samza-checkpoint-manager", config) - } - - private def getTopicMetadataStore(config: Config): TopicMetadataStore = { - val clientId = getClientId(config) - val systemName = getCheckpointSystemName(config) - - val producerConfig = config.getKafkaSystemProducerConfig( - systemName, - clientId, - KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES) - - val consumerConfig = config.getKafkaSystemConsumerConfig( - systemName, - clientId) - - new ClientUtilTopicMetadataStore(producerConfig.bootsrapServers, clientId, consumerConfig.socketTimeoutMs) - } - - private def getConnectZk(config: Config): () => ZkClient = { - val clientId = getClientId(config) - - val checkpointSystemName = getCheckpointSystemName(config) - - val consumerConfig = config.getKafkaSystemConsumerConfig( - checkpointSystemName, - clientId) - - val zkConnectString = Option(consumerConfig.zkConnect) - .getOrElse(throw new SamzaException("no zookeeper.connect defined in config")) - () => { - new ZkClient(zkConnectString, 6000, 6000, ZKStringSerializer) - } - } - - override def migrate(config: Config) { - val jobName = config.getName.getOrElse(throw new SamzaException("Cannot find job name. Cannot proceed with migration.")) - val jobId = config.getJobId.getOrElse("1") - - val checkpointTopicName = KafkaUtil.getCheckpointTopic(jobName, jobId) - - val coordinatorSystemFactory = new CoordinatorStreamSystemFactory - val coordinatorSystemConsumer = coordinatorSystemFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap) - val coordinatorSystemProducer = coordinatorSystemFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap) - - val checkpointManager = new KafkaCheckpointManagerFactory().getCheckpointManager(config, new NoOpMetricsRegistry).asInstanceOf[KafkaCheckpointManager] - - val kafkaUtil = new KafkaUtil(new ExponentialSleepStrategy, getConnectZk(config)) - - // make sure to validate that we only perform migration when checkpoint topic exists - if (kafkaUtil.topicExists(checkpointTopicName)) { - kafkaUtil.validateTopicPartitionCount( - checkpointTopicName, - getCheckpointSystemName(config), - getTopicMetadataStore(config), - 1, - config.failOnCheckpointValidation) - - if (migrationVerification(coordinatorSystemConsumer)) { - info("Migration %s was already performed, doing nothing" format migrationKey) - return - } - - info("No previous migration for %s were detected, performing migration" format migrationKey) - - info("Loading changelog partition mapping from checkpoint topic - %s" format checkpointTopicName) - val changelogMap = checkpointManager.readChangeLogPartitionMapping() - checkpointManager.stop - - info("Writing changelog to coordinator stream topic - %s" format Util.getCoordinatorStreamName(jobName, jobId)) - val changelogPartitionManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, source) - changelogPartitionManager.start() - changelogPartitionManager.writeChangeLogPartitionMapping(changelogMap) - changelogPartitionManager.stop() - } - migrationCompletionMark(coordinatorSystemProducer) - - } - - def migrationVerification(coordinatorSystemConsumer : CoordinatorStreamSystemConsumer): Boolean = { - coordinatorSystemConsumer.register() - coordinatorSystemConsumer.start() - coordinatorSystemConsumer.bootstrap() - val stream = coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE) - val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal) - stream.contains(message.asInstanceOf[CoordinatorStreamMessage]) - } - - def migrationCompletionMark(coordinatorSystemProducer: CoordinatorStreamSystemProducer) = { - info("Marking completion of migration %s" format migrationKey) - val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal) - coordinatorSystemProducer.register(source) - coordinatorSystemProducer.start() - coordinatorSystemProducer.send(message) - coordinatorSystemProducer.stop() - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala b/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala deleted file mode 100644 index 504fc89..0000000 --- a/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.migration - -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{TestUtils, TestZKUtils, Utils} -import kafka.zk.EmbeddedZookeeper -import org.I0Itec.zkclient.ZkClient -import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord} -import org.apache.samza.checkpoint.Checkpoint -import org.apache.samza.checkpoint.kafka.{KafkaCheckpointManager, KafkaCheckpointLogKey, KafkaCheckpointManagerFactory} -import org.apache.samza.config._ -import org.apache.samza.container.TaskName -import org.apache.samza.container.grouper.stream.GroupByPartitionFactory -import org.apache.samza.coordinator.MockSystemFactory -import org.apache.samza.coordinator.stream.messages.SetMigrationMetaMessage -import org.apache.samza.coordinator.stream._ -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.serializers.CheckpointSerde -import org.apache.samza.storage.ChangelogPartitionManager -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.util._ -import org.apache.samza.Partition -import org.junit.Assert._ -import org.junit._ - -import scala.collection.JavaConversions._ -import scala.collection._ - -class TestKafkaCheckpointMigration { - - val checkpointTopic = "checkpoint-topic" - val serdeCheckpointTopic = "checkpoint-topic-invalid-serde" - val checkpointTopicConfig = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null) - val zkConnect: String = TestZKUtils.zookeeperConnect - var zkClient: ZkClient = null - val zkConnectionTimeout = 6000 - val zkSessionTimeout = 6000 - - val brokerId1 = 0 - val brokerId2 = 1 - val brokerId3 = 2 - val ports = TestUtils.choosePorts(3) - val (port1, port2, port3) = (ports(0), ports(1), ports(2)) - - val props1 = TestUtils.createBrokerConfig(brokerId1, port1) - props1.put("controlled.shutdown.enable", "true") - val props2 = TestUtils.createBrokerConfig(brokerId2, port2) - props1.put("controlled.shutdown.enable", "true") - val props3 = TestUtils.createBrokerConfig(brokerId3, port3) - props1.put("controlled.shutdown.enable", "true") - - val config = new java.util.HashMap[String, Object]() - val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3) - config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) - config.put("acks", "all") - config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") - config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString) - config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES) - val producerConfig = new KafkaProducerConfig("kafka", "i001", config) - val partition = new Partition(0) - val partition2 = new Partition(1) - val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123")) - val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345")) - var zookeeper: EmbeddedZookeeper = null - var server1: KafkaServer = null - var server2: KafkaServer = null - var server3: KafkaServer = null - var metadataStore: TopicMetadataStore = null - - val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName - - @Before - def beforeSetupServers { - zookeeper = new EmbeddedZookeeper(zkConnect) - server1 = TestUtils.createServer(new KafkaConfig(props1)) - server2 = TestUtils.createServer(new KafkaConfig(props2)) - server3 = TestUtils.createServer(new KafkaConfig(props3)) - metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name") - } - - @After - def afterCleanLogDirs { - server1.shutdown - server1.awaitShutdown() - server2.shutdown - server2.awaitShutdown() - server3.shutdown - server3.awaitShutdown() - Utils.rm(server1.config.logDirs) - Utils.rm(server2.config.logDirs) - Utils.rm(server3.config.logDirs) - zookeeper.shutdown - } - - private def writeChangeLogPartitionMapping(changelogMapping: Map[TaskName, Integer], cpTopic: String = checkpointTopic) = { - val producer: Producer[Array[Byte], Array[Byte]] = new KafkaProducer(producerConfig.getProducerProperties) - val record = new ProducerRecord( - cpTopic, - 0, - KafkaCheckpointLogKey.getChangelogPartitionMappingKey().toBytes(), - new CheckpointSerde().changelogPartitionMappingToBytes(changelogMapping) - ) - try { - producer.send(record).get() - } catch { - case e: Exception => println(e.getMessage) - } finally { - producer.close() - } - } - - @Test - def testMigrationWithNoCheckpointTopic() { - val mapConfig = Map[String, String]( - "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory", - JobConfig.JOB_NAME -> "test", - JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator", - JobConfig.JOB_CONTAINER_COUNT -> "2", - "task.inputs" -> "test.stream1", - "task.checkpoint.system" -> "test", - SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName, - "systems.test.producer.bootstrap.servers" -> brokers, - "systems.test.consumer.zookeeper.connect" -> zkConnect, - SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName) - - // Enable consumer caching - MockCoordinatorStreamSystemFactory.enableMockConsumerCache() - - val config: MapConfig = new MapConfig(mapConfig) - val migrate = new KafkaCheckpointMigration - migrate.migrate(config) - val consumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, new NoOpMetricsRegistry) - consumer.register() - consumer.start() - consumer.bootstrap() - val bootstrappedStream = consumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE) - assertEquals(1, bootstrappedStream.size()) - - val expectedMigrationMessage = new SetMigrationMetaMessage("CHECKPOINTMIGRATION", "CheckpointMigration09to10", "true") - assertEquals(expectedMigrationMessage, bootstrappedStream.head) - consumer.stop() - } - - @Test - def testMigration() { - try { - val mapConfig = Map( - "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory", - JobConfig.JOB_NAME -> "test", - JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator", - JobConfig.JOB_CONTAINER_COUNT -> "2", - "task.inputs" -> "test.stream1", - "task.checkpoint.system" -> "test", - "systems.test.producer.bootstrap.servers" -> brokers, - "systems.test.consumer.zookeeper.connect" -> zkConnect, - SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName, - SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName) - // Enable consumer caching - MockCoordinatorStreamSystemFactory.enableMockConsumerCache() - - val config = new MapConfig(mapConfig) - val checkpointTopicName = KafkaUtil.getCheckpointTopic("test", "1") - val checkpointManager = new KafkaCheckpointManagerFactory().getCheckpointManager(config, new NoOpMetricsRegistry).asInstanceOf[KafkaCheckpointManager] - - // Write a couple of checkpoints in the old checkpoint topic - val task1 = new TaskName(partition.toString) - val task2 = new TaskName(partition2.toString) - checkpointManager.start - checkpointManager.register(task1) - checkpointManager.register(task2) - checkpointManager.writeCheckpoint(task1, cp1) - checkpointManager.writeCheckpoint(task2, cp2) - - // Write changelog partition info to the old checkpoint topic - val changelogMapping = Map(task1 -> 1.asInstanceOf[Integer], task2 -> 10.asInstanceOf[Integer]) - writeChangeLogPartitionMapping(changelogMapping, checkpointTopicName) - checkpointManager.stop - - // Initialize coordinator stream - val coordinatorFactory = new CoordinatorStreamSystemFactory() - val coordinatorSystemConsumer = coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap) - val coordinatorSystemProducer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap) - coordinatorSystemConsumer.register() - coordinatorSystemConsumer.start() - - assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size, 0) - coordinatorSystemConsumer.stop - - // Start the migration - val migrationInstance = new KafkaCheckpointMigration - migrationInstance.migrate(config) - - // Verify if the changelogPartitionInfo has been migrated - val newChangelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "test") - newChangelogManager.start - val newChangelogMapping = newChangelogManager.readChangeLogPartitionMapping() - newChangelogManager.stop - assertEquals(newChangelogMapping.toMap, changelogMapping) - - // Check for migration message - coordinatorSystemConsumer.register() - coordinatorSystemConsumer.start() - assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size, 1) - coordinatorSystemConsumer.stop() - } - finally { - MockCoordinatorStreamSystemFactory.disableMockConsumerCache() - } - } - - class MockKafkaCheckpointMigration extends KafkaCheckpointMigration{ - var migrationCompletionMarkFlag: Boolean = false - var migrationVerificationMarkFlag: Boolean = false - - override def migrationCompletionMark(coordinatorStreamProducer: CoordinatorStreamSystemProducer) = { - migrationCompletionMarkFlag = true - super.migrationCompletionMark(coordinatorStreamProducer) - } - - override def migrationVerification(coordinatorStreamConsumer: CoordinatorStreamSystemConsumer): Boolean = { - migrationVerificationMarkFlag = true - super.migrationVerification(coordinatorStreamConsumer) - } - } -}