Repository: samza
Updated Branches:
  refs/heads/master 97a3be397 -> 944dd02e1


SAMZA-979 - Remove KafkaCheckpointMigration


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/74b0f840
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/74b0f840
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/74b0f840

Branch: refs/heads/master
Commit: 74b0f8405a73fedda7a0b4b517f3946027d15626
Parents: 97a3be3
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Fri Jul 29 12:58:13 2016 -0700
Committer: Navina Ramesh <nram...@linkedin.com>
Committed: Fri Jul 29 12:58:23 2016 -0700

----------------------------------------------------------------------
 .../apache/samza/checkpoint/OffsetManager.scala |   1 -
 .../scala/org/apache/samza/job/JobRunner.scala  |   4 -
 .../samza/migration/JobRunnerMigration.scala    |  58 -----
 .../org/apache/samza/job/TestJobRunner.scala    |  20 +-
 .../migration/KafkaCheckpointMigration.scala    | 149 ------------
 .../TestKafkaCheckpointMigration.scala          | 243 -------------------
 6 files changed, 1 insertion(+), 474 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 7245902..f8033c5 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -223,7 +223,6 @@ class OffsetManager(
         }
       }
 
-      partitionOffsets.foreach(p => info("task " + taskName + " checkpoint " + 
p._1 + ", " + p._2))
       checkpointManager.writeCheckpoint(taskName, new 
Checkpoint(partitionOffsets))
       Option(lastProcessedOffsets.get(taskName)) match {
         case Some(sspToOffsets) => sspToOffsets.foreach { case (ssp, 
checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) }

http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index a3613ff..383bb13 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -24,7 +24,6 @@ import org.apache.samza.config.{ConfigRewriter, Config}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
 import org.apache.samza.job.ApplicationStatus.Running
-import org.apache.samza.migration.JobRunnerMigration
 import org.apache.samza.util.CommandLine
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
@@ -122,9 +121,6 @@ class JobRunner(config: Config) extends Logging {
     }
     coordinatorSystemProducer.stop
 
-    // Perform any migration plan to run in job runner
-    JobRunnerMigration(config)
-
     // Create the actual job, and submit it.
     val job = jobFactory.getJob(config).submit
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala 
b/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
deleted file mode 100644
index f38b87a..0000000
--- 
a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.migration
-
-import org.apache.samza.config.Config
-import org.apache.samza.util.{Util, Logging}
-import org.apache.samza.SamzaException
-
-
-object JobRunnerMigration {
-  val CHECKPOINT_MIGRATION = 
"org.apache.samza.migration.KafkaCheckpointMigration"
-  val UNSUPPORTED_ERROR_MSG = "Auto checkpoint migration for 0.10.0 upgrade is 
only supported for Kafka checkpointing system, " +
-    "for everything else, please use the checkpoint tool to migrate the 
taskname-to-changelog mapping, and add " +
-    "task.checkpoint.skip-migration=true to your configs."
-  def apply(config: Config) = {
-    val migration = new JobRunnerMigration
-    migration.checkpointMigration(config)
-  }
-}
-
-class JobRunnerMigration extends Logging {
-
-  def checkpointMigration(config: Config) = {
-    val checkpointFactory = Option(config.get("task.checkpoint.factory"))
-    checkpointFactory match {
-      case 
Some("org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory") =>
-        info("Performing checkpoint migration")
-        val checkpointMigrationPlan = 
Util.getObj[MigrationPlan](JobRunnerMigration.CHECKPOINT_MIGRATION)
-        checkpointMigrationPlan.migrate(config)
-      case None =>
-        info("No task.checkpoint.factory defined, not performing any 
checkpoint migration")
-      case _ =>
-        val skipMigration = 
config.getBoolean("task.checkpoint.skip-migration", false)
-        if (skipMigration) {
-          info("Job is configured to skip any checkpoint migration.")
-        } else {
-          error(JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
-          throw new SamzaException(JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
-        }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 
b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index e97656a..17c5297 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -21,13 +21,11 @@ package org.apache.samza.job
 
 import java.io.File
 
-import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
-import org.apache.samza.migration.JobRunnerMigration
-import org.junit.Test
 import org.junit.After
 import org.junit.Assert._
+import org.junit.Test
 
 object TestJobRunner {
   var processCount = 0
@@ -41,22 +39,6 @@ class TestJobRunner {
   }
 
   @Test
-  def testJobRunnerMigrationFails {
-    MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
-
-    try {
-      JobRunner.main(Array(
-        "--config-factory",
-        "org.apache.samza.config.factories.PropertiesConfigFactory",
-        "--config-path",
-        "file://%s/src/test/resources/test-migration-fail.properties" format 
new File(".").getCanonicalPath))
-      fail("Should have failed already.")
-    } catch {
-      case se: SamzaException => assertEquals(se.getMessage, 
JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
-    }
-  }
-
-  @Test
   def testJobRunnerWorks {
     MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
deleted file mode 100644
index 5d2641a..0000000
--- 
a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.migration
-
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient.ZkClient
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.kafka.{KafkaCheckpointManager, 
KafkaCheckpointManagerFactory}
-import org.apache.samza.config.Config
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.coordinator.stream.messages.{CoordinatorStreamMessage, 
SetMigrationMetaMessage}
-import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, 
CoordinatorStreamSystemFactory, CoordinatorStreamSystemProducer}
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.util._
-
-/**
- * Migrates changelog partition mapping from checkpoint topic to coordinator 
stream
- */
-class KafkaCheckpointMigration extends MigrationPlan with Logging {
-  val source = "CHECKPOINTMIGRATION"
-  val migrationKey = "CheckpointMigration09to10"
-  val migrationVal = "true"
-
-  var connectZk: () => ZkClient = null
-
-  private def getCheckpointSystemName(config: Config): String = {
-    config
-      .getCheckpointSystem
-      .getOrElse(throw new SamzaException("no system defined for Kafka's 
checkpoint manager."))
-  }
-
-  private def getClientId(config: Config): String = {
-    KafkaUtil.getClientId("samza-checkpoint-manager", config)
-  }
-
-  private def getTopicMetadataStore(config: Config): TopicMetadataStore = {
-    val clientId = getClientId(config)
-    val systemName = getCheckpointSystemName(config)
-
-    val producerConfig = config.getKafkaSystemProducerConfig(
-      systemName,
-      clientId,
-      KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
-
-    val consumerConfig = config.getKafkaSystemConsumerConfig(
-      systemName,
-      clientId)
-
-    new ClientUtilTopicMetadataStore(producerConfig.bootsrapServers, clientId, 
consumerConfig.socketTimeoutMs)
-  }
-
-  private def getConnectZk(config: Config): () => ZkClient = {
-    val clientId = getClientId(config)
-
-    val checkpointSystemName = getCheckpointSystemName(config)
-
-    val consumerConfig = config.getKafkaSystemConsumerConfig(
-      checkpointSystemName,
-      clientId)
-
-    val zkConnectString = Option(consumerConfig.zkConnect)
-      .getOrElse(throw new SamzaException("no zookeeper.connect defined in 
config"))
-    () => {
-      new ZkClient(zkConnectString, 6000, 6000, ZKStringSerializer)
-    }
-  }
-
-  override def migrate(config: Config) {
-    val jobName = config.getName.getOrElse(throw new SamzaException("Cannot 
find job name. Cannot proceed with migration."))
-    val jobId = config.getJobId.getOrElse("1")
-
-    val checkpointTopicName = KafkaUtil.getCheckpointTopic(jobName, jobId)
-
-    val coordinatorSystemFactory = new CoordinatorStreamSystemFactory
-    val coordinatorSystemConsumer = 
coordinatorSystemFactory.getCoordinatorStreamSystemConsumer(config, new 
MetricsRegistryMap)
-    val coordinatorSystemProducer = 
coordinatorSystemFactory.getCoordinatorStreamSystemProducer(config, new 
MetricsRegistryMap)
-
-    val checkpointManager = new 
KafkaCheckpointManagerFactory().getCheckpointManager(config, new 
NoOpMetricsRegistry).asInstanceOf[KafkaCheckpointManager]
-
-    val kafkaUtil = new KafkaUtil(new ExponentialSleepStrategy, 
getConnectZk(config))
-
-    // make sure to validate that we only perform migration when checkpoint 
topic exists
-    if (kafkaUtil.topicExists(checkpointTopicName)) {
-      kafkaUtil.validateTopicPartitionCount(
-        checkpointTopicName,
-        getCheckpointSystemName(config),
-        getTopicMetadataStore(config),
-        1,
-        config.failOnCheckpointValidation)
-
-      if (migrationVerification(coordinatorSystemConsumer)) {
-        info("Migration %s was already performed, doing nothing" format 
migrationKey)
-        return
-      }
-
-      info("No previous migration for %s were detected, performing migration" 
format migrationKey)
-
-      info("Loading changelog partition mapping from checkpoint topic - %s" 
format checkpointTopicName)
-      val changelogMap = checkpointManager.readChangeLogPartitionMapping()
-      checkpointManager.stop
-
-      info("Writing changelog to coordinator stream topic - %s" format 
Util.getCoordinatorStreamName(jobName, jobId))
-      val changelogPartitionManager = new 
ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, 
source)
-      changelogPartitionManager.start()
-      changelogPartitionManager.writeChangeLogPartitionMapping(changelogMap)
-      changelogPartitionManager.stop()
-    }
-    migrationCompletionMark(coordinatorSystemProducer)
-
-  }
-
-  def migrationVerification(coordinatorSystemConsumer : 
CoordinatorStreamSystemConsumer): Boolean = {
-    coordinatorSystemConsumer.register()
-    coordinatorSystemConsumer.start()
-    coordinatorSystemConsumer.bootstrap()
-    val stream = 
coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE)
-    val message = new SetMigrationMetaMessage(source, migrationKey, 
migrationVal)
-    stream.contains(message.asInstanceOf[CoordinatorStreamMessage])
-  }
-
-  def migrationCompletionMark(coordinatorSystemProducer: 
CoordinatorStreamSystemProducer) = {
-    info("Marking completion of migration %s" format migrationKey)
-    val message = new SetMigrationMetaMessage(source, migrationKey, 
migrationVal)
-    coordinatorSystemProducer.register(source)
-    coordinatorSystemProducer.start()
-    coordinatorSystemProducer.send(message)
-    coordinatorSystemProducer.stop()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
deleted file mode 100644
index 504fc89..0000000
--- 
a/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.migration
-
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, TestZKUtils, Utils}
-import kafka.zk.EmbeddedZookeeper
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.producer.{KafkaProducer, Producer, 
ProducerConfig, ProducerRecord}
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.checkpoint.kafka.{KafkaCheckpointManager, 
KafkaCheckpointLogKey, KafkaCheckpointManagerFactory}
-import org.apache.samza.config._
-import org.apache.samza.container.TaskName
-import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
-import org.apache.samza.coordinator.MockSystemFactory
-import org.apache.samza.coordinator.stream.messages.SetMigrationMetaMessage
-import org.apache.samza.coordinator.stream._
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util._
-import org.apache.samza.Partition
-import org.junit.Assert._
-import org.junit._
-
-import scala.collection.JavaConversions._
-import scala.collection._
-
-class TestKafkaCheckpointMigration {
-
-  val checkpointTopic = "checkpoint-topic"
-  val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
-  val checkpointTopicConfig = 
KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null)
-  val zkConnect: String = TestZKUtils.zookeeperConnect
-  var zkClient: ZkClient = null
-  val zkConnectionTimeout = 6000
-  val zkSessionTimeout = 6000
-
-  val brokerId1 = 0
-  val brokerId2 = 1
-  val brokerId3 = 2
-  val ports = TestUtils.choosePorts(3)
-  val (port1, port2, port3) = (ports(0), ports(1), ports(2))
-
-  val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  props1.put("controlled.shutdown.enable", "true")
-  val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
-  props1.put("controlled.shutdown.enable", "true")
-  val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
-  props1.put("controlled.shutdown.enable", "true")
-
-  val config = new java.util.HashMap[String, Object]()
-  val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, 
port3)
-  config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
-  config.put("acks", "all")
-  config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
-  config.put(ProducerConfig.RETRIES_CONFIG, (new 
Integer(java.lang.Integer.MAX_VALUE-1)).toString)
-  config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
-  val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
-  val partition = new Partition(0)
-  val partition2 = new Partition(1)
-  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", 
partition) -> "123"))
-  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", 
partition) -> "12345"))
-  var zookeeper: EmbeddedZookeeper = null
-  var server1: KafkaServer = null
-  var server2: KafkaServer = null
-  var server3: KafkaServer = null
-  var metadataStore: TopicMetadataStore = null
-
-  val systemStreamPartitionGrouperFactoryString = 
classOf[GroupByPartitionFactory].getCanonicalName
-
-  @Before
-  def beforeSetupServers {
-    zookeeper = new EmbeddedZookeeper(zkConnect)
-    server1 = TestUtils.createServer(new KafkaConfig(props1))
-    server2 = TestUtils.createServer(new KafkaConfig(props2))
-    server3 = TestUtils.createServer(new KafkaConfig(props3))
-    metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
-  }
-
-  @After
-  def afterCleanLogDirs {
-    server1.shutdown
-    server1.awaitShutdown()
-    server2.shutdown
-    server2.awaitShutdown()
-    server3.shutdown
-    server3.awaitShutdown()
-    Utils.rm(server1.config.logDirs)
-    Utils.rm(server2.config.logDirs)
-    Utils.rm(server3.config.logDirs)
-    zookeeper.shutdown
-  }
-
-  private def writeChangeLogPartitionMapping(changelogMapping: Map[TaskName, 
Integer], cpTopic: String = checkpointTopic) = {
-    val producer: Producer[Array[Byte], Array[Byte]] = new 
KafkaProducer(producerConfig.getProducerProperties)
-    val record = new ProducerRecord(
-      cpTopic,
-      0,
-      KafkaCheckpointLogKey.getChangelogPartitionMappingKey().toBytes(),
-      new CheckpointSerde().changelogPartitionMappingToBytes(changelogMapping)
-    )
-    try {
-      producer.send(record).get()
-    } catch {
-      case e: Exception => println(e.getMessage)
-    } finally {
-      producer.close()
-    }
-  }
-
-  @Test
-  def testMigrationWithNoCheckpointTopic() {
-    val mapConfig = Map[String, String](
-      "task.checkpoint.factory" -> 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
-      JobConfig.JOB_NAME -> "test",
-      JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
-      JobConfig.JOB_CONTAINER_COUNT -> "2",
-      "task.inputs" -> "test.stream1",
-      "task.checkpoint.system" -> "test",
-      SystemConfig.SYSTEM_FACTORY.format("test") -> 
classOf[MockSystemFactory].getCanonicalName,
-      "systems.test.producer.bootstrap.servers" -> brokers,
-      "systems.test.consumer.zookeeper.connect" -> zkConnect,
-      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> 
classOf[MockCoordinatorStreamSystemFactory].getName)
-
-    // Enable consumer caching
-    MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
-
-    val config: MapConfig = new MapConfig(mapConfig)
-    val migrate = new KafkaCheckpointMigration
-    migrate.migrate(config)
-    val consumer = new 
CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, new 
NoOpMetricsRegistry)
-    consumer.register()
-    consumer.start()
-    consumer.bootstrap()
-    val bootstrappedStream = 
consumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE)
-    assertEquals(1, bootstrappedStream.size())
-
-    val expectedMigrationMessage = new 
SetMigrationMetaMessage("CHECKPOINTMIGRATION", "CheckpointMigration09to10", 
"true")
-    assertEquals(expectedMigrationMessage, bootstrappedStream.head)
-    consumer.stop()
-  }
-
-  @Test
-  def testMigration() {
-    try {
-      val mapConfig = Map(
-        "task.checkpoint.factory" -> 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
-        JobConfig.JOB_NAME -> "test",
-        JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
-        JobConfig.JOB_CONTAINER_COUNT -> "2",
-        "task.inputs" -> "test.stream1",
-        "task.checkpoint.system" -> "test",
-        "systems.test.producer.bootstrap.servers" -> brokers,
-        "systems.test.consumer.zookeeper.connect" -> zkConnect,
-        SystemConfig.SYSTEM_FACTORY.format("test") -> 
classOf[MockSystemFactory].getCanonicalName,
-        SystemConfig.SYSTEM_FACTORY.format("coordinator") -> 
classOf[MockCoordinatorStreamSystemFactory].getName)
-      // Enable consumer caching
-      MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
-
-      val config = new MapConfig(mapConfig)
-      val checkpointTopicName = KafkaUtil.getCheckpointTopic("test", "1")
-      val checkpointManager = new 
KafkaCheckpointManagerFactory().getCheckpointManager(config, new 
NoOpMetricsRegistry).asInstanceOf[KafkaCheckpointManager]
-
-      // Write a couple of checkpoints in the old checkpoint topic
-      val task1 = new TaskName(partition.toString)
-      val task2 = new TaskName(partition2.toString)
-      checkpointManager.start
-      checkpointManager.register(task1)
-      checkpointManager.register(task2)
-      checkpointManager.writeCheckpoint(task1, cp1)
-      checkpointManager.writeCheckpoint(task2, cp2)
-
-      // Write changelog partition info to the old checkpoint topic
-      val changelogMapping = Map(task1 -> 1.asInstanceOf[Integer], task2 -> 
10.asInstanceOf[Integer])
-      writeChangeLogPartitionMapping(changelogMapping, checkpointTopicName)
-      checkpointManager.stop
-
-      // Initialize coordinator stream
-      val coordinatorFactory = new CoordinatorStreamSystemFactory()
-      val coordinatorSystemConsumer = 
coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new 
MetricsRegistryMap)
-      val coordinatorSystemProducer = 
coordinatorFactory.getCoordinatorStreamSystemProducer(config, new 
MetricsRegistryMap)
-      coordinatorSystemConsumer.register()
-      coordinatorSystemConsumer.start()
-
-      
assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size,
 0)
-      coordinatorSystemConsumer.stop
-
-      // Start the migration
-      val migrationInstance = new KafkaCheckpointMigration
-      migrationInstance.migrate(config)
-
-      // Verify if the changelogPartitionInfo has been migrated
-      val newChangelogManager = new 
ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, 
"test")
-      newChangelogManager.start
-      val newChangelogMapping = 
newChangelogManager.readChangeLogPartitionMapping()
-      newChangelogManager.stop
-      assertEquals(newChangelogMapping.toMap, changelogMapping)
-
-      // Check for migration message
-      coordinatorSystemConsumer.register()
-      coordinatorSystemConsumer.start()
-      
assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size,
 1)
-      coordinatorSystemConsumer.stop()
-    }
-    finally {
-      MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
-    }
-  }
-
-  class MockKafkaCheckpointMigration extends KafkaCheckpointMigration{
-    var migrationCompletionMarkFlag: Boolean = false
-    var migrationVerificationMarkFlag: Boolean = false
-
-    override def migrationCompletionMark(coordinatorStreamProducer: 
CoordinatorStreamSystemProducer) = {
-      migrationCompletionMarkFlag = true
-      super.migrationCompletionMark(coordinatorStreamProducer)
-    }
-
-    override def migrationVerification(coordinatorStreamConsumer: 
CoordinatorStreamSystemConsumer): Boolean = {
-      migrationVerificationMarkFlag = true
-      super.migrationVerification(coordinatorStreamConsumer)
-    }
-  }
-}

Reply via email to