Repository: spark
Updated Branches:
  refs/heads/branch-1.6 34e824d90 -> 116b7158f


[SPARK-11198][STREAMING][KINESIS] Support de-aggregation of records during 
recovery

While the KCL handles de-aggregation during the regular operation, during 
recovery we use the lower level api, and therefore need to de-aggregate the 
records.

tdas Testing is an issue, we need protobuf magic to do the aggregated records. 
Maybe we could depend on KPL for tests?

Author: Burak Yavuz <brk...@gmail.com>

Closes #9403 from brkyvz/kinesis-deaggregation.

(cherry picked from commit 26062d22607e1f9854bc2588ba22a4e0f8bba48c)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/116b7158
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/116b7158
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/116b7158

Branch: refs/heads/branch-1.6
Commit: 116b7158fa27cf9dbd935be1f395c68d2f8928ec
Parents: 34e824d
Author: Burak Yavuz <brk...@gmail.com>
Authored: Mon Nov 9 17:18:49 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Nov 9 17:18:59 2015 -0800

----------------------------------------------------------------------
 extras/kinesis-asl/pom.xml                      |   6 +
 .../kinesis/KinesisBackedBlockRDD.scala         |   6 +-
 .../streaming/kinesis/KinesisReceiver.scala     |   1 -
 .../kinesis/KinesisRecordProcessor.scala        |   2 +-
 .../streaming/kinesis/KinesisTestUtils.scala    | 235 ----------------
 .../kinesis/KinesisBackedBlockRDDSuite.scala    |  12 +-
 .../streaming/kinesis/KinesisStreamSuite.scala  |  17 +-
 .../streaming/kinesis/KinesisTestUtils.scala    | 266 +++++++++++++++++++
 pom.xml                                         |   2 +
 9 files changed, 299 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/116b7158/extras/kinesis-asl/pom.xml
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml
index ef72d97..519a920 100644
--- a/extras/kinesis-asl/pom.xml
+++ b/extras/kinesis-asl/pom.xml
@@ -65,6 +65,12 @@
       <version>${aws.java.sdk.version}</version>
     </dependency>
     <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>amazon-kinesis-producer</artifactId>
+      <version>${aws.kinesis.producer.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/spark/blob/116b7158/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index 000897a..691c179 100644
--- 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -23,6 +23,7 @@ import scala.util.control.NonFatal
 
 import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
 import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
 import com.amazonaws.services.kinesis.model._
 
 import org.apache.spark._
@@ -210,7 +211,10 @@ class KinesisSequenceRangeIterator(
       s"getting records using shard iterator") {
         client.getRecords(getRecordsRequest)
       }
-    (getRecordsResult.getRecords.iterator().asScala, 
getRecordsResult.getNextShardIterator)
+    // De-aggregate records, if KPL was used in producing the records. The KCL 
automatically
+    // handles de-aggregation during regular operation. This code path is used 
during recovery
+    val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords)
+    (recordIterator.iterator().asScala, getRecordsResult.getNextShardIterator)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/116b7158/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 50993f1..97dbb91 100644
--- 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -216,7 +216,6 @@ private[kinesis] class KinesisReceiver[T](
       val metadata = SequenceNumberRange(streamName, shardId,
         records.get(0).getSequenceNumber(), records.get(records.size() - 
1).getSequenceNumber())
       blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
-
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/116b7158/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index e381ffa..b5b76cb 100644
--- 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -80,7 +80,7 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: 
KinesisReceiver[T], w
            *     more than once.
            */
           logError(s"Exception:  WorkerId $workerId encountered and exception 
while storing " +
-              " or checkpointing a batch for workerId $workerId and shardId 
$shardId.", e)
+              s" or checkpointing a batch for workerId $workerId and shardId 
$shardId.", e)
 
           /* Rethrow the exception to the Kinesis Worker that is managing this 
RecordProcessor. */
           throw e

http://git-wip-us.apache.org/repos/asf/spark/blob/116b7158/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
deleted file mode 100644
index 634bf94..0000000
--- 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import scala.util.{Failure, Random, Success, Try}
-
-import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
-import com.amazonaws.services.dynamodbv2.document.DynamoDB
-import com.amazonaws.services.kinesis.AmazonKinesisClient
-import com.amazonaws.services.kinesis.model._
-
-import org.apache.spark.Logging
-
-/**
- * Shared utility methods for performing Kinesis tests that actually transfer 
data
- */
-private[kinesis] class KinesisTestUtils extends Logging {
-
-  val endpointUrl = KinesisTestUtils.endpointUrl
-  val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
-  val streamShardCount = 2
-
-  private val createStreamTimeoutSeconds = 300
-  private val describeStreamPollTimeSeconds = 1
-
-  @volatile
-  private var streamCreated = false
-
-  @volatile
-  private var _streamName: String = _
-
-  private lazy val kinesisClient = {
-    val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
-    client.setEndpoint(endpointUrl)
-    client
-  }
-
-  private lazy val dynamoDB = {
-    val dynamoDBClient = new AmazonDynamoDBClient(new 
DefaultAWSCredentialsProviderChain())
-    dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
-    new DynamoDB(dynamoDBClient)
-  }
-
-  def streamName: String = {
-    require(streamCreated, "Stream not yet created, call createStream() to 
create one")
-    _streamName
-  }
-
-  def createStream(): Unit = {
-    require(!streamCreated, "Stream already created")
-    _streamName = findNonExistentStreamName()
-
-    // Create a stream. The number of shards determines the provisioned 
throughput.
-    logInfo(s"Creating stream ${_streamName}")
-    val createStreamRequest = new CreateStreamRequest()
-    createStreamRequest.setStreamName(_streamName)
-    createStreamRequest.setShardCount(2)
-    kinesisClient.createStream(createStreamRequest)
-
-    // The stream is now being created. Wait for it to become active.
-    waitForStreamToBeActive(_streamName)
-    streamCreated = true
-    logInfo(s"Created stream ${_streamName}")
-  }
-
-  /**
-   * Push data to Kinesis stream and return a map of
-   * shardId -> seq of (data, seq number) pushed to corresponding shard
-   */
-  def pushData(testData: Seq[Int]): Map[String, Seq[(Int, String)]] = {
-    require(streamCreated, "Stream not yet created, call createStream() to 
create one")
-    val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, 
String)]]()
-
-    testData.foreach { num =>
-      val str = num.toString
-      val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
-        .withData(ByteBuffer.wrap(str.getBytes()))
-        .withPartitionKey(str)
-
-      val putRecordResult = kinesisClient.putRecord(putRecordRequest)
-      val shardId = putRecordResult.getShardId
-      val seqNumber = putRecordResult.getSequenceNumber()
-      val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
-        new ArrayBuffer[(Int, String)]())
-      sentSeqNumbers += ((num, seqNumber))
-    }
-
-    logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
-    shardIdToSeqNumbers.toMap
-  }
-
-  /**
-   * Expose a Python friendly API.
-   */
-  def pushData(testData: java.util.List[Int]): Unit = {
-    pushData(testData.asScala)
-  }
-
-  def deleteStream(): Unit = {
-    try {
-      if (streamCreated) {
-        kinesisClient.deleteStream(streamName)
-      }
-    } catch {
-      case e: Exception =>
-        logWarning(s"Could not delete stream $streamName")
-    }
-  }
-
-  def deleteDynamoDBTable(tableName: String): Unit = {
-    try {
-      val table = dynamoDB.getTable(tableName)
-      table.delete()
-      table.waitForDelete()
-    } catch {
-      case e: Exception =>
-        logWarning(s"Could not delete DynamoDB table $tableName")
-    }
-  }
-
-  private def describeStream(streamNameToDescribe: String): 
Option[StreamDescription] = {
-    try {
-      val describeStreamRequest = new 
DescribeStreamRequest().withStreamName(streamNameToDescribe)
-      val desc = 
kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
-      Some(desc)
-    } catch {
-      case rnfe: ResourceNotFoundException =>
-        None
-    }
-  }
-
-  private def findNonExistentStreamName(): String = {
-    var testStreamName: String = null
-    do {
-      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
-      testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}"
-    } while (describeStream(testStreamName).nonEmpty)
-    testStreamName
-  }
-
-  private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
-    val startTime = System.currentTimeMillis()
-    val endTime = startTime + 
TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds)
-    while (System.currentTimeMillis() < endTime) {
-      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
-      describeStream(streamNameToWaitFor).foreach { description =>
-        val streamStatus = description.getStreamStatus()
-        logDebug(s"\t- current state: $streamStatus\n")
-        if ("ACTIVE".equals(streamStatus)) {
-          return
-        }
-      }
-    }
-    require(false, s"Stream $streamName never became active")
-  }
-}
-
-private[kinesis] object KinesisTestUtils {
-
-  val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS"
-  val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL"
-  val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com";
-
-  lazy val shouldRunTests = {
-    val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
-    if (isEnvSet) {
-      // scalastyle:off println
-      // Print this so that they are easily visible on the console and not 
hidden in the log4j logs.
-      println(
-        s"""
-          |Kinesis tests that actually send data has been enabled by setting 
the environment
-          |variable $envVarNameForEnablingTests to 1. This will create Kinesis 
Streams and
-          |DynamoDB tables in AWS. Please be aware that this may incur some 
AWS costs.
-          |By default, the tests use the endpoint URL $defaultEndpointUrl to 
create Kinesis streams.
-          |To change this endpoint URL to a different region, you can set the 
environment variable
-          |$endVarNameForEndpoint to the desired endpoint URL
-          |(e.g. 
$endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com";).
-        """.stripMargin)
-      // scalastyle:on println
-    }
-    isEnvSet
-  }
-
-  lazy val endpointUrl = {
-    val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl)
-    // scalastyle:off println
-    // Print this so that they are easily visible on the console and not 
hidden in the log4j logs.
-    println(s"Using endpoint URL $url for creating Kinesis streams for tests.")
-    // scalastyle:on println
-    url
-  }
-
-  def isAWSCredentialsPresent: Boolean = {
-    Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
-  }
-
-  def getAWSCredentials(): AWSCredentials = {
-    assert(shouldRunTests,
-      "Kinesis test not enabled, should not attempt to get AWS credentials")
-    Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
-      case Success(cred) => cred
-      case Failure(e) =>
-        throw new Exception(
-          s"""
-             |Kinesis tests enabled using environment variable 
$envVarNameForEnablingTests
-             |but could not find AWS credentials. Please follow instructions 
in AWS documentation
-             |to set the credentials in your system such that the 
DefaultAWSCredentialsProviderChain
-             |can find the credentials.
-           """.stripMargin)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/116b7158/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
 
b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
index 9f9e146..52c61df 100644
--- 
a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ 
b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -22,7 +22,8 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, 
StreamBlockId}
 import org.apache.spark.{SparkConf, SparkContext, SparkException}
 
-class KinesisBackedBlockRDDSuite extends KinesisFunSuite with 
BeforeAndAfterAll {
+abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
+  extends KinesisFunSuite with BeforeAndAfterAll {
 
   private val testData = 1 to 8
 
@@ -37,13 +38,12 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite 
with BeforeAndAfterAll
   private var sc: SparkContext = null
   private var blockManager: BlockManager = null
 
-
   override def beforeAll(): Unit = {
     runIfTestsEnabled("Prepare KinesisTestUtils") {
       testUtils = new KinesisTestUtils()
       testUtils.createStream()
 
-      shardIdToDataAndSeqNumbers = testUtils.pushData(testData)
+      shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = 
aggregateTestData)
       require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to 
multiple shards")
 
       shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
@@ -247,3 +247,9 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite 
with BeforeAndAfterAll
     Array.tabulate(num) { i => new StreamBlockId(0, i) }
   }
 }
+
+class WithAggregationKinesisBackedBlockRDDSuite
+  extends KinesisBackedBlockRDDTests(aggregateTestData = true)
+
+class WithoutAggregationKinesisBackedBlockRDDSuite
+  extends KinesisBackedBlockRDDTests(aggregateTestData = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/116b7158/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
 
b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index ba84e55..dee3044 100644
--- 
a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ 
b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
 import org.apache.spark.util.Utils
 import org.apache.spark.{SparkConf, SparkContext}
 
-class KinesisStreamSuite extends KinesisFunSuite
+abstract class KinesisStreamTests(aggregateTestData: Boolean) extends 
KinesisFunSuite
   with Eventually with BeforeAndAfter with BeforeAndAfterAll {
 
   // This is the name that KCL will use to save metadata to DynamoDB
@@ -182,13 +182,13 @@ class KinesisStreamSuite extends KinesisFunSuite
     val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
     stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
       collected ++= rdd.collect()
-      logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
+      logInfo("Collected = " + collected.mkString(", "))
     }
     ssc.start()
 
     val testData = 1 to 10
     eventually(timeout(120 seconds), interval(10 second)) {
-      testUtils.pushData(testData)
+      testUtils.pushData(testData, aggregateTestData)
       assert(collected === testData.toSet, "\nData received does not match 
data sent")
     }
     ssc.stop(stopSparkContext = false)
@@ -207,13 +207,13 @@ class KinesisStreamSuite extends KinesisFunSuite
     val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
     stream.foreachRDD { rdd =>
       collected ++= rdd.collect()
-      logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
+      logInfo("Collected = " + collected.mkString(", "))
     }
     ssc.start()
 
     val testData = 1 to 10
     eventually(timeout(120 seconds), interval(10 second)) {
-      testUtils.pushData(testData)
+      testUtils.pushData(testData, aggregateTestData)
       val modData = testData.map(_ + 5)
       assert(collected === modData.toSet, "\nData received does not match data 
sent")
     }
@@ -254,7 +254,7 @@ class KinesisStreamSuite extends KinesisFunSuite
     // If this times out because numBatchesWithData is empty, then its likely 
that foreachRDD
     // function failed with exceptions, and nothing got added to 
`collectedData`
     eventually(timeout(2 minutes), interval(1 seconds)) {
-      testUtils.pushData(1 to 5)
+      testUtils.pushData(1 to 5, aggregateTestData)
       assert(isCheckpointPresent && numBatchesWithData > 10)
     }
     ssc.stop(stopSparkContext = true)  // stop the SparkContext so that the 
blocks are not reused
@@ -285,5 +285,8 @@ class KinesisStreamSuite extends KinesisFunSuite
     }
     ssc.stop()
   }
-
 }
+
+class WithAggregationKinesisStreamSuite extends 
KinesisStreamTests(aggregateTestData = true)
+
+class WithoutAggregationKinesisStreamSuite extends 
KinesisStreamTests(aggregateTestData = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/116b7158/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
 
b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
new file mode 100644
index 0000000..7487aa1
--- /dev/null
+++ 
b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Random, Success, Try}
+
+import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.regions.RegionUtils
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
+import com.amazonaws.services.dynamodbv2.document.DynamoDB
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.model._
+import com.amazonaws.services.kinesis.producer.{KinesisProducer, 
KinesisProducerConfiguration, UserRecordResult}
+import com.google.common.util.concurrent.{FutureCallback, Futures}
+
+import org.apache.spark.Logging
+
+/**
+ * Shared utility methods for performing Kinesis tests that actually transfer 
data
+ */
+private[kinesis] class KinesisTestUtils extends Logging {
+
+  val endpointUrl = KinesisTestUtils.endpointUrl
+  val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+  val streamShardCount = 2
+
+  private val createStreamTimeoutSeconds = 300
+  private val describeStreamPollTimeSeconds = 1
+
+  @volatile
+  private var streamCreated = false
+
+  @volatile
+  private var _streamName: String = _
+
+  private lazy val kinesisClient = {
+    val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
+    client.setEndpoint(endpointUrl)
+    client
+  }
+
+  private lazy val dynamoDB = {
+    val dynamoDBClient = new AmazonDynamoDBClient(new 
DefaultAWSCredentialsProviderChain())
+    dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
+    new DynamoDB(dynamoDBClient)
+  }
+
+  private lazy val kinesisProducer: KinesisProducer = {
+    val conf = new KinesisProducerConfiguration()
+      .setRecordMaxBufferedTime(1000)
+      .setMaxConnections(1)
+      .setRegion(regionName)
+      .setMetricsLevel("none")
+
+    new KinesisProducer(conf)
+  }
+
+  def streamName: String = {
+    require(streamCreated, "Stream not yet created, call createStream() to 
create one")
+    _streamName
+  }
+
+  def createStream(): Unit = {
+    require(!streamCreated, "Stream already created")
+    _streamName = findNonExistentStreamName()
+
+    // Create a stream. The number of shards determines the provisioned 
throughput.
+    logInfo(s"Creating stream ${_streamName}")
+    val createStreamRequest = new CreateStreamRequest()
+    createStreamRequest.setStreamName(_streamName)
+    createStreamRequest.setShardCount(2)
+    kinesisClient.createStream(createStreamRequest)
+
+    // The stream is now being created. Wait for it to become active.
+    waitForStreamToBeActive(_streamName)
+    streamCreated = true
+    logInfo(s"Created stream ${_streamName}")
+  }
+
+  /**
+   * Push data to Kinesis stream and return a map of
+   * shardId -> seq of (data, seq number) pushed to corresponding shard
+   */
+  def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, 
String)]] = {
+    require(streamCreated, "Stream not yet created, call createStream() to 
create one")
+    val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, 
String)]]()
+
+    testData.foreach { num =>
+      val str = num.toString
+      val data = ByteBuffer.wrap(str.getBytes())
+      if (aggregate) {
+        val future = kinesisProducer.addUserRecord(streamName, str, data)
+        val kinesisCallBack = new FutureCallback[UserRecordResult]() {
+          override def onFailure(t: Throwable): Unit = {} // do nothing
+
+          override def onSuccess(result: UserRecordResult): Unit = {
+            val shardId = result.getShardId
+            val seqNumber = result.getSequenceNumber()
+            val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+              new ArrayBuffer[(Int, String)]())
+            sentSeqNumbers += ((num, seqNumber))
+          }
+        }
+
+        Futures.addCallback(future, kinesisCallBack)
+        kinesisProducer.flushSync() // make sure we send all data before 
returning the map
+      } else {
+        val putRecordRequest = new 
PutRecordRequest().withStreamName(streamName)
+          .withData(data)
+          .withPartitionKey(str)
+
+        val putRecordResult = kinesisClient.putRecord(putRecordRequest)
+        val shardId = putRecordResult.getShardId
+        val seqNumber = putRecordResult.getSequenceNumber()
+        val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+          new ArrayBuffer[(Int, String)]())
+        sentSeqNumbers += ((num, seqNumber))
+      }
+    }
+
+    logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
+    shardIdToSeqNumbers.toMap
+  }
+
+  /**
+   * Expose a Python friendly API.
+   */
+  def pushData(testData: java.util.List[Int]): Unit = {
+    pushData(testData.asScala, aggregate = false)
+  }
+
+  def deleteStream(): Unit = {
+    try {
+      if (streamCreated) {
+        kinesisClient.deleteStream(streamName)
+      }
+    } catch {
+      case e: Exception =>
+        logWarning(s"Could not delete stream $streamName")
+    }
+  }
+
+  def deleteDynamoDBTable(tableName: String): Unit = {
+    try {
+      val table = dynamoDB.getTable(tableName)
+      table.delete()
+      table.waitForDelete()
+    } catch {
+      case e: Exception =>
+        logWarning(s"Could not delete DynamoDB table $tableName")
+    }
+  }
+
+  private def describeStream(streamNameToDescribe: String): 
Option[StreamDescription] = {
+    try {
+      val describeStreamRequest = new 
DescribeStreamRequest().withStreamName(streamNameToDescribe)
+      val desc = 
kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
+      Some(desc)
+    } catch {
+      case rnfe: ResourceNotFoundException =>
+        None
+    }
+  }
+
+  private def findNonExistentStreamName(): String = {
+    var testStreamName: String = null
+    do {
+      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
+      testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}"
+    } while (describeStream(testStreamName).nonEmpty)
+    testStreamName
+  }
+
+  private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
+    val startTime = System.currentTimeMillis()
+    val endTime = startTime + 
TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds)
+    while (System.currentTimeMillis() < endTime) {
+      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
+      describeStream(streamNameToWaitFor).foreach { description =>
+        val streamStatus = description.getStreamStatus()
+        logDebug(s"\t- current state: $streamStatus\n")
+        if ("ACTIVE".equals(streamStatus)) {
+          return
+        }
+      }
+    }
+    require(false, s"Stream $streamName never became active")
+  }
+}
+
+private[kinesis] object KinesisTestUtils {
+
+  val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS"
+  val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL"
+  val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com";
+
+  lazy val shouldRunTests = {
+    val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
+    if (isEnvSet) {
+      // scalastyle:off println
+      // Print this so that they are easily visible on the console and not 
hidden in the log4j logs.
+      println(
+        s"""
+          |Kinesis tests that actually send data has been enabled by setting 
the environment
+          |variable $envVarNameForEnablingTests to 1. This will create Kinesis 
Streams and
+          |DynamoDB tables in AWS. Please be aware that this may incur some 
AWS costs.
+          |By default, the tests use the endpoint URL $defaultEndpointUrl to 
create Kinesis streams.
+          |To change this endpoint URL to a different region, you can set the 
environment variable
+          |$endVarNameForEndpoint to the desired endpoint URL
+          |(e.g. 
$endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com";).
+        """.stripMargin)
+      // scalastyle:on println
+    }
+    isEnvSet
+  }
+
+  lazy val endpointUrl = {
+    val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl)
+    // scalastyle:off println
+    // Print this so that they are easily visible on the console and not 
hidden in the log4j logs.
+    println(s"Using endpoint URL $url for creating Kinesis streams for tests.")
+    // scalastyle:on println
+    url
+  }
+
+  def isAWSCredentialsPresent: Boolean = {
+    Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
+  }
+
+  def getAWSCredentials(): AWSCredentials = {
+    assert(shouldRunTests,
+      "Kinesis test not enabled, should not attempt to get AWS credentials")
+    Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
+      case Success(cred) => cred
+      case Failure(e) =>
+        throw new Exception(
+          s"""
+             |Kinesis tests enabled using environment variable 
$envVarNameForEnablingTests
+             |but could not find AWS credentials. Please follow instructions 
in AWS documentation
+             |to set the credentials in your system such that the 
DefaultAWSCredentialsProviderChain
+             |can find the credentials.
+           """.stripMargin)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/116b7158/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4ed1c0c..fd8c773 100644
--- a/pom.xml
+++ b/pom.xml
@@ -154,6 +154,8 @@
     <jets3t.version>0.7.1</jets3t.version>
     <aws.java.sdk.version>1.9.40</aws.java.sdk.version>
     <aws.kinesis.client.version>1.4.0</aws.kinesis.client.version>
+    <!-- the producer is used in tests -->
+    <aws.kinesis.producer.version>0.10.1</aws.kinesis.producer.version>
     <!--  org.apache.httpcomponents/httpclient-->
     <commons.httpclient.version>4.3.2</commons.httpclient.version>
     <!--  commons-httpclient/commons-httpclient-->


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to