[GitHub] spark pull request: [SPARK-7838] [STREAMING] Set scope for kinesis...
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/6369#issuecomment-104940840 good catch, TD. didn't even know this concept existed. do kafka or other impls need this? should we add this to the Custom Receiver guide? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7788] Made KinesisReceiver.onStart() no...
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/6348#issuecomment-104845837 looks good --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7788] Made KinesisReceiver.onStart() no...
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/6348#issuecomment-104787031 otherwise, looks good --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7788] Made KinesisReceiver.onStart() no...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/6348#discussion_r30934351 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -126,8 +131,19 @@ private[kinesis] class KinesisReceiver( } worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration) -worker.run() - +workerThread = new Thread() { + override def run(): Unit = { +try { + worker.run() +} catch { + case NonFatal(e) => +restart("Error running the KCL worker in Receiver", e) +} + } +} +workerThread.setName("Kinesis Receiver") --- End diff -- is there a need to distinguish the thread names by receiver Id or some such? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7722] [STREAMING] Added Kinesis to styl...
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/6325#issuecomment-104409718 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656] Ability t...
Github user cfregly closed the pull request at: https://github.com/apache/spark/pull/5882 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656][SPARK-767...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/6147#discussion_r30775440 --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala --- @@ -40,6 +40,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorC import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason import com.amazonaws.services.kinesis.model.Record +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain --- End diff -- i don't think this import is needed anymore. please remove --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656][SPARK-767...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/6147#discussion_r30775370 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -79,19 +181,116 @@ object KinesisUtils { * per Kinesis' limit of 24 hours * (InitialPositionInStream.TRIM_HORIZON) or * the tip of the stream (InitialPositionInStream.LATEST). - * @param storageLevel Storage level to use for storing the received objects + * @param storageLevel Storage level to use for storing the received objects. + * StorageLevel.MEMORY_AND_DISK_2 is recommended. + */ + def createStream( + jssc: JavaStreamingContext, + kinesisAppName: String, + streamName: String, + endpointUrl: String, + regionName: String, + initialPositionInStream: InitialPositionInStream, + checkpointInterval: Duration, + storageLevel: StorageLevel +): JavaReceiverInputDStream[Array[Byte]] = { +createStream(jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName, + initialPositionInStream, checkpointInterval, storageLevel) + } + + /** + * Create an input stream that pulls messages from a Kinesis stream. + * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. * - * @return JavaReceiverInputDStream[Array[Byte]] + * Note: + * The given AWS credentials will get saved in DStream checkpoints if checkpointing + * is enabled. Make sure that your checkpoint directory is secure. + * + * @param jssc Java StreamingContext object + * @param kinesisAppName Kinesis application name used by the Kinesis Client Library + *(KCL) to update DynamoDB + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName Name of region used by the Kinesis Client Library (KCL) to update + * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain) + * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain) + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. + *See the Kinesis Spark Streaming documentation for more + *details on the different types of checkpoints. + * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or + * the tip of the stream (InitialPositionInStream.LATEST). + * @param storageLevel Storage level to use for storing the received objects. + * StorageLevel.MEMORY_AND_DISK_2 is recommended. */ - @Experimental def createStream( - jssc: JavaStreamingContext, - streamName: String, - endpointUrl: String, + jssc: JavaStreamingContext, + kinesisAppName: String, + streamName: String, + endpointUrl: String, + regionName: String, + initialPositionInStream: InitialPositionInStream, + checkpointInterval: Duration, + storageLevel: StorageLevel, + awsAccessKeyId: String, + awsSecretKey: String +): JavaReceiverInputDStream[Array[Byte]] = { +createStream(jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName, +initialPositionInStream, checkpointInterval, storageLevel, awsAccessKeyId, awsSecretKey) + } + + /** + * Create an input stream that pulls messages from a Kinesis stream. + * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. + * + * Note: + * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain + * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain + * gets AWS credentials. + * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch. + * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in + * [[org.apache.
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656][SPARK-767...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/6147#discussion_r30775347 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -79,19 +181,116 @@ object KinesisUtils { * per Kinesis' limit of 24 hours * (InitialPositionInStream.TRIM_HORIZON) or * the tip of the stream (InitialPositionInStream.LATEST). - * @param storageLevel Storage level to use for storing the received objects + * @param storageLevel Storage level to use for storing the received objects. + * StorageLevel.MEMORY_AND_DISK_2 is recommended. + */ + def createStream( + jssc: JavaStreamingContext, + kinesisAppName: String, + streamName: String, + endpointUrl: String, + regionName: String, + initialPositionInStream: InitialPositionInStream, + checkpointInterval: Duration, + storageLevel: StorageLevel +): JavaReceiverInputDStream[Array[Byte]] = { +createStream(jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName, + initialPositionInStream, checkpointInterval, storageLevel) + } + + /** + * Create an input stream that pulls messages from a Kinesis stream. + * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. * - * @return JavaReceiverInputDStream[Array[Byte]] + * Note: + * The given AWS credentials will get saved in DStream checkpoints if checkpointing + * is enabled. Make sure that your checkpoint directory is secure. + * + * @param jssc Java StreamingContext object + * @param kinesisAppName Kinesis application name used by the Kinesis Client Library + *(KCL) to update DynamoDB + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName Name of region used by the Kinesis Client Library (KCL) to update + * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain) + * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain) + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. + *See the Kinesis Spark Streaming documentation for more + *details on the different types of checkpoints. + * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or + * the tip of the stream (InitialPositionInStream.LATEST). + * @param storageLevel Storage level to use for storing the received objects. + * StorageLevel.MEMORY_AND_DISK_2 is recommended. */ - @Experimental def createStream( - jssc: JavaStreamingContext, - streamName: String, - endpointUrl: String, + jssc: JavaStreamingContext, + kinesisAppName: String, + streamName: String, + endpointUrl: String, + regionName: String, + initialPositionInStream: InitialPositionInStream, + checkpointInterval: Duration, + storageLevel: StorageLevel, + awsAccessKeyId: String, + awsSecretKey: String +): JavaReceiverInputDStream[Array[Byte]] = { +createStream(jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName, +initialPositionInStream, checkpointInterval, storageLevel, awsAccessKeyId, awsSecretKey) + } + + /** + * Create an input stream that pulls messages from a Kinesis stream. + * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. + * + * Note: + * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain + * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain + * gets AWS credentials. + * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch. + * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in + * [[org.apache.
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656][SPARK-767...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/6147#discussion_r30775257 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -79,19 +181,116 @@ object KinesisUtils { * per Kinesis' limit of 24 hours --- End diff -- missing initialPosition, in wrong place per the others --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656][SPARK-767...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/6147#discussion_r30775310 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -79,19 +181,116 @@ object KinesisUtils { * per Kinesis' limit of 24 hours * (InitialPositionInStream.TRIM_HORIZON) or * the tip of the stream (InitialPositionInStream.LATEST). - * @param storageLevel Storage level to use for storing the received objects + * @param storageLevel Storage level to use for storing the received objects. + * StorageLevel.MEMORY_AND_DISK_2 is recommended. + */ + def createStream( + jssc: JavaStreamingContext, + kinesisAppName: String, + streamName: String, + endpointUrl: String, + regionName: String, + initialPositionInStream: InitialPositionInStream, + checkpointInterval: Duration, + storageLevel: StorageLevel +): JavaReceiverInputDStream[Array[Byte]] = { +createStream(jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName, + initialPositionInStream, checkpointInterval, storageLevel) + } + + /** + * Create an input stream that pulls messages from a Kinesis stream. + * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. * - * @return JavaReceiverInputDStream[Array[Byte]] + * Note: + * The given AWS credentials will get saved in DStream checkpoints if checkpointing + * is enabled. Make sure that your checkpoint directory is secure. + * + * @param jssc Java StreamingContext object + * @param kinesisAppName Kinesis application name used by the Kinesis Client Library + *(KCL) to update DynamoDB + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName Name of region used by the Kinesis Client Library (KCL) to update + * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain) --- End diff -- move these to the end --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656][SPARK-767...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/6147#discussion_r30775220 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -48,28 +94,84 @@ object KinesisUtils { * per Kinesis' limit of 24 hours --- End diff -- is the @param initialPositionInStream missing here, as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656][SPARK-767...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/6147#discussion_r30775176 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -16,29 +16,75 @@ */ package org.apache.spark.streaming.kinesis -import org.apache.spark.annotation.Experimental +import com.amazonaws.regions.RegionUtils +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.Duration -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream -import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream - -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.streaming.{Duration, StreamingContext} -/** - * Helper class to create Amazon Kinesis Input Stream - * :: Experimental :: - */ -@Experimental object KinesisUtils { /** - * Create an InputDStream that pulls messages from a Kinesis stream. - * :: Experimental :: - * @param sscStreamingContext object + * Create an input stream that pulls messages from a Kinesis stream. + * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. + * + * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain + * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain + * gets the AWS credentials. + * + * @param ssc StreamingContext object + * @param kinesisAppName Kinesis application name used by the Kinesis Client Library + *(KCL) to update DynamoDB + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName Name of region used by the Kinesis Client Library (KCL) to update + * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or + * the tip of the stream (InitialPositionInStream.LATEST). + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. --- End diff -- not sure why i keep thinking this checkpointInterval should go above initialPositionInStream. not a big deal, but i remember it being different for some reason. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656][SPARK-767...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/6147#discussion_r30775126 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -16,29 +16,75 @@ */ package org.apache.spark.streaming.kinesis -import org.apache.spark.annotation.Experimental +import com.amazonaws.regions.RegionUtils +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.Duration -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream -import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream - -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.streaming.{Duration, StreamingContext} -/** - * Helper class to create Amazon Kinesis Input Stream - * :: Experimental :: - */ -@Experimental object KinesisUtils { /** - * Create an InputDStream that pulls messages from a Kinesis stream. - * :: Experimental :: - * @param sscStreamingContext object + * Create an input stream that pulls messages from a Kinesis stream. + * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. + * + * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain + * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain + * gets the AWS credentials. + * + * @param ssc StreamingContext object + * @param kinesisAppName Kinesis application name used by the Kinesis Client Library + *(KCL) to update DynamoDB + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName Name of region used by the Kinesis Client Library (KCL) to update + * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or + * the tip of the stream (InitialPositionInStream.LATEST). + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. + *See the Kinesis Spark Streaming documentation for more + *details on the different types of checkpoints. + * @param storageLevel Storage level to use for storing the received objects. + * StorageLevel.MEMORY_AND_DISK_2 is recommended. + */ + def createStream( + ssc: StreamingContext, + kinesisAppName: String, + streamName: String, + endpointUrl: String, + regionName: String, + initialPositionInStream: InitialPositionInStream, + checkpointInterval: Duration, + storageLevel: StorageLevel +): ReceiverInputDStream[Array[Byte]] = { +ssc.receiverStream( + new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName), +initialPositionInStream, checkpointInterval, storageLevel, None)) + } + + /** + * Create an input stream that pulls messages from a Kinesis stream. + * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. + * + * Note: + * The given AWS credentials will get saved in DStream checkpoints if checkpointing + * is enabled. Make sure that your checkpoint directory is secure. + * + * @param ssc StreamingContext object + * @param kinesisAppName Kinesis application name used by the Kinesis Client Library + *(KCL) to update DynamoDB * @param streamName Kinesis stream name * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName Name of region used by the Kinesis Client Library (KCL) to update + * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + * @param awsAccess
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656][SPARK-767...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/6147#discussion_r30775061 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala --- @@ -190,7 +198,7 @@ private[kinesis] object KinesisRecordProcessor extends Logging { logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e) retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis) } -/* Throw: Shutdown has been requested by the Kinesis Client Library.*/ +/* Throw: Shutdown has been requested by the Kinesis Client Library. */ --- End diff -- // instead of /* */ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656][SPARK-767...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/6147#discussion_r30775051 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala --- @@ -116,7 +124,7 @@ private[kinesis] class KinesisRecordProcessor( logError(s"Exception: WorkerId $workerId encountered and exception while storing " + " or checkpointing a batch for workerId $workerId and shardId $shardId.", e) - /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor.*/ + /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */ --- End diff -- use // instead of /* */ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656][SPARK-767...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/6147#discussion_r30774982 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -59,92 +60,103 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker * (InitialPositionInStream.TRIM_HORIZON) or * the tip of the stream (InitialPositionInStream.LATEST). * @param storageLevel Storage level to use for storing the received objects - * - * @return ReceiverInputDStream[Array[Byte]] + * @param awsCredentialsOption Optional AWS credentials, used when user directly specifies + * the credentials */ private[kinesis] class KinesisReceiver( appName: String, streamName: String, endpointUrl: String, -checkpointInterval: Duration, +regionName: String, initialPositionInStream: InitialPositionInStream, -storageLevel: StorageLevel) - extends Receiver[Array[Byte]](storageLevel) with Logging { receiver => - - /* - * The following vars are built in the onStart() method which executes in the Spark Worker after - * this code is serialized and shipped remotely. - */ - - /* - * workerId should be based on the ip address of the actual Spark Worker where this code runs - * (not the Driver's ip address.) - */ - var workerId: String = null +checkpointInterval: Duration, --- End diff -- i think this goes above initialPositionInStream --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656][SPARK-767...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/6147#discussion_r30774960 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -59,92 +60,103 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker * (InitialPositionInStream.TRIM_HORIZON) or --- End diff -- @param initialPositionInStream seems to be missing also, it appears to have moved below checkpointInterval which is a bit confusing. i hit this while testing. the @param docs may need to be re-ordered, as well --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656][SPARK-767...
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/6147#issuecomment-102827199 @tdas: Looks like you forgot to bring over the docs/streaming-kinesis-integration.md and 2 examples (JavaKinesisWordCountASL and KinesisWordCountASL) changes from my original PR: https://github.com/apache/spark/pull/5882/files how do you suggest we proceed? i believe the samples - if left unchanged - would use the old deprecated/experimental API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6514][SPARK-5960][SPARK-6656] Ability t...
GitHub user cfregly opened a pull request: https://github.com/apache/spark/pull/5882 [SPARK-6514][SPARK-5960][SPARK-6656] Ability to specify DynamoDB region ... ...separate from Kinesis region (or use the same), ability to pass in AWS credentials and not rely on EC2 IAM roles and other environment settings that may be difficult to set otherwise, ability to override appName for environments that provide you an existing SparkContext/SparkConf where the appName is already set (ie. Spark Shell) @tdas (and others): please review. thanks! You can merge this pull request into a Git repository by running: $ git pull https://github.com/cfregly/spark kinesis-1.4.0 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/5882.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5882 commit 2853cb0be83e6a22902d1a93985efef167e8eb44 Author: Chris Fregly Date: 2015-05-04T05:14:22Z [SPARK-6514][SPARK-5960][SPARK-6656] Ability to specify DynamoDB region separate from Kinesis region (or use the same), ability to pass in AWS credentials and not rely on EC2 IAM roles and other environment settings that may be difficult to set otherwise, ability to override appName for environments that provide you an existing SparkContext/SparkConf where the appName is already set (ie. Spark Shell) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Expose regionName setting in Kinesis receiver ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5375#discussion_r28391705 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -39,6 +39,7 @@ object KinesisUtils { * @param sscStreamingContext object * @param streamName Kinesis stream name * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName Region name to indicate the location of the Amazon Kinesis service --- End diff -- we may want to consider making a default for this to maintain backward compatibility. the problem is that the Scala and Java createStream() methods in this helper class will conflict if you use defaults for this. i had the same issue with initialPositionInStream as well as storageLevel which is why they're not defaults. not sure we can do much about it without changing the names of the methods to createScalaStream() and createJavaStream() or equivalent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Expose regionName setting in Kinesis receiver ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5375#discussion_r28391631 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -70,6 +72,7 @@ object KinesisUtils { * @param jssc Java StreamingContext object * @param streamName Kinesis stream name * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName Region name to indicate the location of the Amazon Kinesis service --- End diff -- and here? :) "The Amazon DynamoDB table and Amazon CloudWatch metrics associated with your application will also use this region setting." --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Expose regionName setting in Kinesis receiver ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5375#discussion_r28391619 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -39,6 +39,7 @@ object KinesisUtils { * @param sscStreamingContext object * @param streamName Kinesis stream name * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName Region name to indicate the location of the Amazon Kinesis service --- End diff -- same here... "The Amazon DynamoDB table and Amazon CloudWatch metrics associated with your application will also use this region setting." --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Expose regionName setting in Kinesis receiver ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5375#discussion_r28391597 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -36,18 +36,19 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: * https://github.com/awslabs/amazon-kinesis-client - * This is a custom receiver used with StreamingContext.receiverStream(Receiver) + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) * as described here: * http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * Instances of this class will get shipped to the Spark Streaming Workers + * Instances of this class will get shipped to the Spark Streaming Workers * to run within a Spark Executor. * * @param appName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams * by the Kinesis Client Library. If you change the App name or Stream name, - * the KCL will throw errors. This usually requires deleting the backing + * the KCL will throw errors. This usually requires deleting the backing * DynamoDB table with the same name this Kinesis application. * @param streamName Kinesis stream name * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName Region name to indicate the location of the Amazon Kinesis service --- End diff -- might want to add a note similar to the KCL README.md when describing this param: "The Amazon DynamoDB table and Amazon CloudWatch metrics associated with your application will also use this region setting." --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5960 allow aws credentials to be passed ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5439#discussion_r28391083 --- Diff: extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java --- @@ -131,7 +131,7 @@ public static void main(String[] args) { for (int i = 0; i < numStreams; i++) { streamsList.add( KinesisUtils.createStream(jssc, streamName, endpointUrl, checkpointInterval, -InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()) +InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2(), null) --- End diff -- another option is to require a non-null value to tighten up the interface - DefaultAWSCredentialsProvider() would be recommended, but people can pass in their own. again, please make sure there are no Serializable issues if you go this route. i remember having issues here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5960 allow aws credentials to be passed ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5439#discussion_r28391006 --- Diff: extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java --- @@ -131,7 +131,7 @@ public static void main(String[] args) { for (int i = 0; i < numStreams; i++) { streamsList.add( KinesisUtils.createStream(jssc, streamName, endpointUrl, checkpointInterval, -InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()) +InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2(), null) --- End diff -- also, might want to name this variable to be more explicit. you can set it to None/null, but naming params goes a long way in these examples. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5960 allow aws credentials to be passed ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5439#discussion_r28390980 --- Diff: docs/streaming-kinesis-integration.md --- @@ -67,6 +67,8 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m - `[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see Kinesis Checkpointing section and Amazon Kinesis API documentation for more details). + - `[credentials]`: Optional AWS credentials. --- End diff -- it's a bit misleading to say this is optional since we still need to provide a value (None/null or Some) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5960 allow aws credentials to be passed ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5439#discussion_r28390951 --- Diff: docs/streaming-kinesis-integration.md --- @@ -44,7 +44,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; JavaReceiverInputDStream kinesisStream = KinesisUtils.createStream( - streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]); + streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position], [credentials]); --- End diff -- same here :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5960 allow aws credentials to be passed ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5439#discussion_r28390948 --- Diff: docs/streaming-kinesis-integration.md --- @@ -32,7 +32,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream val kinesisStream = KinesisUtils.createStream( - streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]) + streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position], [credentials]) --- End diff -- might be good to include some more details on what this is, exactly. this is a bit confusing if you're thinking of this in terms of ACCESS_KEY_ID and SECRET_KEY (2 separate strings) also, be sure to include any imports that are relevant to this code sample - similar to the InitialPositionInStream reference right before it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5960 allow aws credentials to be passed ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5439#discussion_r28390832 --- Diff: extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java --- @@ -34,7 +34,7 @@ public void testKinesisStream() { // Tests the API, does not actually test data receiving JavaDStream kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream", "https://kinesis.us-west-2.amazonaws.com";, new Duration(2000), -InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()); +InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2(), null); --- End diff -- would be good to have tests around setting the credentials in the following 2 cases: 1) None (null) 2) Some[AWSCredentials] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5960 allow aws credentials to be passed ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5439#discussion_r28390845 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala --- @@ -113,7 +113,7 @@ private object KinesisWordCountASL extends Logging { /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */ val kinesisStreams = (0 until numStreams).map { i => KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) + InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2, None) --- End diff -- would be nice to have a true default here versus adding to the signature --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5960 allow aws credentials to be passed ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5439#discussion_r28390844 --- Diff: extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java --- @@ -131,7 +131,7 @@ public static void main(String[] args) { for (int i = 0; i < numStreams; i++) { streamsList.add( KinesisUtils.createStream(jssc, streamName, endpointUrl, checkpointInterval, -InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()) +InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2(), null) --- End diff -- would be nice to have a true default here versus adding to the signature --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5960 allow aws credentials to be passed ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5439#discussion_r28390830 --- Diff: extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala --- @@ -86,7 +86,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft // Tests the API, does not actually test data receiving val kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream", "https://kinesis.us-west-2.amazonaws.com";, Seconds(2), - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2); + InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2, None) --- End diff -- would be good to have tests around setting the credentials in the following 2 cases: 1) None (null) 2) Some[AWSCredentials] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5960 allow aws credentials to be passed ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5439#discussion_r28390749 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -59,9 +61,10 @@ object KinesisUtils { endpointUrl: String, checkpointInterval: Duration, initialPositionInStream: InitialPositionInStream, - storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = { + storageLevel: StorageLevel, + credentials: Option[AWSCredentials]): ReceiverInputDStream[Array[Byte]] = { --- End diff -- any thought to using a default param where new DefaultAWSCredentialsProvider() is the default to reduce the API changes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5960 allow aws credentials to be passed ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5439#discussion_r28390679 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -16,6 +16,7 @@ */ package org.apache.spark.streaming.kinesis +import com.amazonaws.auth.AWSCredentials --- End diff -- my initial attempt at adding this support introduced a BasicAWSCredentialsProvider as follows: https://github.com/cfregly/spark/blob/kinesis-dbc/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/BasicAWSCredentialsProvider.scala highlights: 1) takes AccessKey and SecretKey as Strings 2) follows the same hierarchy as the rest of the AWSCredentialsProviders (including not being Serializable) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5960 allow aws credentials to be passed ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5439#discussion_r28390431 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -82,15 +82,19 @@ private[kinesis] class KinesisReceiver( var workerId: String = null /* - * This impl uses the DefaultAWSCredentialsProviderChain and searches for credentials - * in the following order of precedence: + * This impl uses the DefaultAWSCredentialsProviderChain unless it's provided by constructor + * and searches for credentials in the following order of precedence: * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY * Java System Properties - aws.accessKeyId and aws.secretKey * Credential profiles file at the default location (~/.aws/credentials) shared by all * AWS SDKs and the AWS CLI * Instance profile credentials delivered through the Amazon EC2 metadata service */ - var credentialsProvider: AWSCredentialsProvider = null + var credentialsProvider: AWSCredentialsProvider = (credentials map { cr => new AWSCredentialsProvider { --- End diff -- i seem to remember having problems when constructing the AWSCredentialsProvider outside of the onStart() method due to this hierarchy (including DefaultAWSCredentialsProviderChain) not being Serializable. i assume this works, but wondering what may have changed. also, is there a simpler way to do this in Scala? i feel like this could be simplified, but maybe i'm missing something. seems like a lot of syntax for something so simple. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5960 allow aws credentials to be passed ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5439#discussion_r28390315 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -82,15 +82,19 @@ private[kinesis] class KinesisReceiver( var workerId: String = null /* - * This impl uses the DefaultAWSCredentialsProviderChain and searches for credentials - * in the following order of precedence: + * This impl uses the DefaultAWSCredentialsProviderChain unless it's provided by constructor + * and searches for credentials in the following order of precedence: --- End diff -- might be good to reword this to be more explicit about the 2 different scenarios: 1) Some[AWSCredentials] are provided in which case we use those 2) None are provided and therefore DefaultAWSCredentialsProviderChain will be constructed which searches for credentials in the following order... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-5960 allow aws credentials to be passed ...
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/5439#discussion_r28390150 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -24,8 +24,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.Duration import org.apache.spark.streaming.receiver.Receiver -import com.amazonaws.auth.AWSCredentialsProvider -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain} import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor --- End diff -- collapse the IRecordProcessor and IRecordProcessorFactory into a single import, as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Expose regionName setting in Kinesis receiver ...
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/5375#issuecomment-90308088 @srowen @kopiczko This is part of a larger effort to overhaul Kinesis-based streaming slated for 1.4. Lots of API changes including region, AWS credentials, and application name - as well as upgrading both the AWS Java SDK and the KCL. Here's the parent jira: https://issues.apache.org/jira/browse/SPARK-6599. Here's the related jira that covers the region portion: https://issues.apache.org/jira/browse/SPARK-6514. We should definitely try to be backward-compatible even though the API is Experimental. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: clean up println's
GitHub user cfregly opened a pull request: https://github.com/apache/spark/pull/5153 clean up println's You can merge this pull request into a Git repository by running: $ git pull https://github.com/cfregly/spark ml-docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/5153.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5153 commit db75eeccbfc1e1d4c74901a96e4e100ebf0e8a18 Author: Chris Fregly Date: 2015-03-24T04:11:29Z cleaned up println's a bit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5187][SQL] Fix caching of tables with H...
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/3987#issuecomment-69423882 lgtm. as we just discussed, this is the same code path as SchemaRDD.cache(), so no need for additional tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3639] [Streaming] [Kinesis] Allow users...
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/3092#issuecomment-65963749 @tdas the summary is here: https://issues.apache.org/jira/browse/SPARK-3640?focusedCommentId=14204334&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14204334 @aniketbhatnagar: is this still a valid Jira and PR? or should we close them? lemme know. thanks! -chris --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3639] [Streaming] [Kinesis] Allow users...
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/3092#issuecomment-62340980 @tdas i've updated SPARK-3640 with an offline discussion between aniket and me. we may not need to integrate this PR. waiting for aniket to confirm. thanks! -chris --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3639] [Streaming] [Kinesis] Allow users...
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/3092#issuecomment-62340672 @aniketbhatnagar it looks like the title of this PR should reference SPARK-3640 instead. can you change this? thx. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981][Streaming] Updated kinesis docs a...
Github user cfregly closed the pull request at: https://github.com/apache/spark/pull/2306 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981][Streaming] Updated kinesis docs a...
GitHub user cfregly opened a pull request: https://github.com/apache/spark/pull/2306 [SPARK-1981][Streaming] Updated kinesis docs and added architecture diagram You can merge this pull request into a Git repository by running: $ git pull https://github.com/cfregly/spark kinesis-doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2306.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2306 commit b3b0ff118cac3c0a5a10f9912b383bb0665c9a1b Author: Chris Fregly Date: 2014-07-16T07:03:04Z [SPARK-1981] Add AWS Kinesis streaming support commit d18e680670dfed3039477941e953b5811adde4ab Author: Chris Fregly Date: 2014-07-16T22:23:21Z Merge remote-tracking branch 'upstream/master' Resolved conflict: project/SparkBuild.scala commit cd68c0d7bb0c1ef38e7c92d0cd6eb4a7ccf2ce27 Author: Chris Fregly Date: 2014-07-19T03:16:13Z fixed typos and backward compatibility commit e7c8978482e1092cf599a31b402ab1f4ec11e36c Author: Chris Fregly Date: 2014-07-22T03:09:42Z Merge remote-tracking branch 'upstream/master' commit 828f8aeb1081cf7ad9e5386e1cce933ece9c3d62 Author: Chris Fregly Date: 2014-07-22T05:20:42Z more cleanup commit 338997e6e750c206bfb50a654b725be5f33beb07 Author: Chris Fregly Date: 2014-07-22T15:54:35Z improve build docs for kinesis commit 6c395619dde93a9b8e9137b1150de4ae5129cf4b Author: Chris Fregly Date: 2014-07-24T03:55:55Z parameterized the versions of the aws java sdk and kinesis client commit 21de67fbc5298ce73dd36fe8372dbeee27f69854 Author: Chris Fregly Date: 2014-07-31T00:43:29Z Merge remote-tracking branch 'upstream/master' Incorporated feedback from TD. Renamed project to kinesis-asl, simplified examples, improved docs, fixed bug with AWS regions outside of us-east-1 commit db3eefd0b845ade8251235698c74f8bdc5d35e5a Author: Chris Fregly Date: 2014-07-31T00:43:40Z Merge remote-tracking branch 'upstream/master' commit 912640cb344c77102e4ca4d884b8b0d0206ed627 Author: Chris Fregly Date: 2014-07-31T01:03:27Z changed the foundKinesis class to be a publically-avail class commit d17ca6d6a36ddf0a3030eacae0eace3fdd758cc5 Author: Chris Fregly Date: 2014-07-31T17:00:09Z per TD's feedback: updated docs, simplified the KinesisUtils api commit bf614e9ed870a3c23670d3783d574b1e4280bd81 Author: Chris Fregly Date: 2014-07-31T17:33:20Z per matei's feedback: moved the kinesis examples into the examples/ dir commit e33cbeb0238be90a878b71bc7354f957dfe45891 Author: Chris Fregly Date: 2014-08-01T06:07:46Z Merge remote-tracking branch 'upstream/master' commit 74e5c7c3ce99f5cd30d269d62aca31d2b275288c Author: Chris Fregly Date: 2014-08-01T21:14:42Z updated per TD's feedback. simplified examples, updated docs commit 0e1c67b079b87b12f58557922ea7d4b257ee571b Author: Chris Fregly Date: 2014-08-01T21:15:41Z Merge remote-tracking branch 'upstream/master' commit 691a6be900015358d55a03c046f93d6336297ea2 Author: Chris Fregly Date: 2014-08-01T21:47:00Z fixed tests and formatting, fixed a bug with JavaKinesisWordCount during union of streams commit 0393795b53c2789973c081dba6f7651fd8678adc Author: Chris Fregly Date: 2014-08-02T02:23:04Z moved Kinesis examples out of examples/ and back into extras/kinesis-asl updated the build to only include kinesis-asl inside the examples jar when -Pkinesis-asl is specified commit 47745816b21d7d2255a98283e3055a5a2a397a27 Author: Chris Fregly Date: 2014-08-02T07:10:07Z updated docs, renamed retry to retryRandom to be more clear, removed retries around store() method commit 8e1ae2e2174e78bac1b73cb72ceec45adad1c35a Author: Chris Fregly Date: 2014-08-03T02:02:40Z Merge remote-tracking branch 'upstream/master' commit 862df67eddc234e86a047a1ddae7a438d794c280 Author: Chris Fregly Date: 2014-08-03T23:38:56Z Merge remote-tracking branch 'upstream/master' commit 0f37061e2e5ac4a73213d2dccc848aa8bc57b56f Author: Chris Fregly Date: 2014-08-03T23:40:45Z SPARK-1981: (Kinesis streaming support) updated streaming-kinesis.md commit 9b1c71afb83856c4f0f68cbf92d2b6b444a0bb73 Author: Chris Fregly Date: 2014-08-03T23:56:06Z better explained why spark checkpoints are disabled in the example (due to no stateful operations being used) commit baefa30c8acd8ad2d26e4a7f54b071cdaab09b58 Author: Chris Fregly Date: 2014-09-06T20:14:18Z Merge branch 'master' of https://github.com/cfregly/spark Conflicts: docs/streaming-kinesis.md commit fed0ad926658f50a958edb62d4f0c4733f1e34d5 Author: Chris Fregly Date: 2014-09-06T20:20:21Z updated kinesis docs and added an arch diagra
[GitHub] spark pull request: [SPARK-1981] updated streaming-kinesis.md
GitHub user cfregly opened a pull request: https://github.com/apache/spark/pull/1757 [SPARK-1981] updated streaming-kinesis.md fixed markup, separated out sections more-clearly, more thorough explanations You can merge this pull request into a Git repository by running: $ git pull https://github.com/cfregly/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1757.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1757 commit b3b0ff118cac3c0a5a10f9912b383bb0665c9a1b Author: Chris Fregly Date: 2014-07-16T07:03:04Z [SPARK-1981] Add AWS Kinesis streaming support commit d18e680670dfed3039477941e953b5811adde4ab Author: Chris Fregly Date: 2014-07-16T22:23:21Z Merge remote-tracking branch 'upstream/master' Resolved conflict: project/SparkBuild.scala commit cd68c0d7bb0c1ef38e7c92d0cd6eb4a7ccf2ce27 Author: Chris Fregly Date: 2014-07-19T03:16:13Z fixed typos and backward compatibility commit e7c8978482e1092cf599a31b402ab1f4ec11e36c Author: Chris Fregly Date: 2014-07-22T03:09:42Z Merge remote-tracking branch 'upstream/master' commit 828f8aeb1081cf7ad9e5386e1cce933ece9c3d62 Author: Chris Fregly Date: 2014-07-22T05:20:42Z more cleanup commit 338997e6e750c206bfb50a654b725be5f33beb07 Author: Chris Fregly Date: 2014-07-22T15:54:35Z improve build docs for kinesis commit 6c395619dde93a9b8e9137b1150de4ae5129cf4b Author: Chris Fregly Date: 2014-07-24T03:55:55Z parameterized the versions of the aws java sdk and kinesis client commit 21de67fbc5298ce73dd36fe8372dbeee27f69854 Author: Chris Fregly Date: 2014-07-31T00:43:29Z Merge remote-tracking branch 'upstream/master' Incorporated feedback from TD. Renamed project to kinesis-asl, simplified examples, improved docs, fixed bug with AWS regions outside of us-east-1 commit db3eefd0b845ade8251235698c74f8bdc5d35e5a Author: Chris Fregly Date: 2014-07-31T00:43:40Z Merge remote-tracking branch 'upstream/master' commit 912640cb344c77102e4ca4d884b8b0d0206ed627 Author: Chris Fregly Date: 2014-07-31T01:03:27Z changed the foundKinesis class to be a publically-avail class commit d17ca6d6a36ddf0a3030eacae0eace3fdd758cc5 Author: Chris Fregly Date: 2014-07-31T17:00:09Z per TD's feedback: updated docs, simplified the KinesisUtils api commit bf614e9ed870a3c23670d3783d574b1e4280bd81 Author: Chris Fregly Date: 2014-07-31T17:33:20Z per matei's feedback: moved the kinesis examples into the examples/ dir commit e33cbeb0238be90a878b71bc7354f957dfe45891 Author: Chris Fregly Date: 2014-08-01T06:07:46Z Merge remote-tracking branch 'upstream/master' commit 74e5c7c3ce99f5cd30d269d62aca31d2b275288c Author: Chris Fregly Date: 2014-08-01T21:14:42Z updated per TD's feedback. simplified examples, updated docs commit 0e1c67b079b87b12f58557922ea7d4b257ee571b Author: Chris Fregly Date: 2014-08-01T21:15:41Z Merge remote-tracking branch 'upstream/master' commit 691a6be900015358d55a03c046f93d6336297ea2 Author: Chris Fregly Date: 2014-08-01T21:47:00Z fixed tests and formatting, fixed a bug with JavaKinesisWordCount during union of streams commit 0393795b53c2789973c081dba6f7651fd8678adc Author: Chris Fregly Date: 2014-08-02T02:23:04Z moved Kinesis examples out of examples/ and back into extras/kinesis-asl updated the build to only include kinesis-asl inside the examples jar when -Pkinesis-asl is specified commit 47745816b21d7d2255a98283e3055a5a2a397a27 Author: Chris Fregly Date: 2014-08-02T07:10:07Z updated docs, renamed retry to retryRandom to be more clear, removed retries around store() method commit 8e1ae2e2174e78bac1b73cb72ceec45adad1c35a Author: Chris Fregly Date: 2014-08-03T02:02:40Z Merge remote-tracking branch 'upstream/master' commit 862df67eddc234e86a047a1ddae7a438d794c280 Author: Chris Fregly Date: 2014-08-03T23:38:56Z Merge remote-tracking branch 'upstream/master' commit 0f37061e2e5ac4a73213d2dccc848aa8bc57b56f Author: Chris Fregly Date: 2014-08-03T23:40:45Z SPARK-1981: (Kinesis streaming support) updated streaming-kinesis.md --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15727693 --- Diff: dev/audit-release/audit_release.py --- @@ -105,7 +105,7 @@ def get_url(url): "spark-core", "spark-bagel", "spark-mllib", "spark-streaming", "spark-repl", "spark-graphx", "spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-mqtt", "spark-streaming-twitter", "spark-streaming-zeromq", -"spark-catalyst", "spark-sql", "spark-hive" +"spark-catalyst", "spark-sql", "spark-hive", "kinesis-asl" --- End diff -- changed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15727694 --- Diff: dev/audit-release/sbt_app_kinesis/build.sbt --- @@ -0,0 +1,30 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +name := "Kinesis Test" + +version := "1.0" + +scalaVersion := System.getenv.get("SCALA_VERSION") + +libraryDependencies += "org.apache.spark" %% "spark-core" % System.getenv.get("SPARK_VERSION") +libraryDependencies += "org.apache.spark" %% "spark-streaming" % System.getenv.get("SPARK_VERSION") --- End diff -- gotcha --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15727688 --- Diff: examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala --- @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer + +import scala.util.Random + +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.kinesis.KinesisUtils + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.model.PutRecordRequest + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details on + * the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard + * for the given stream. + * It then starts pulling from the last checkpointed sequence number of the given + *and . + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCountASL + *is the name of the Kinesis stream (ie. mySparkStream) + *is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + *$ export AWS_ACCESS_KEY_ID= + *$ export AWS_SECRET_KEY= + *$ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.KinesisWordCountASL mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class below called KinesisWordCountProducerASL which puts + * dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducerASL are provided in that class definition. + */ +object KinesisWordCountASL extends Logging { + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length < 2) { + System.err.println( +""" + |Usage: KinesisWordCount + | is the name of the Kinesis stream + | is the endpoint of the Kinesis service + | (e.g. https://kinesis.us-east-1.amazonaws.com) +""".stripMargin) + System.exit(1) +} + +StreamingExamples.setStreamingLogLevels() + +/** Populate the appropriate variables from the given args */ +val Array(streamName, endpointUrl) = args + +/** Determine the number of shards from the stream */ +val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) +kinesisClient.setEndpoint(endpointUrl) +val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() + .size() + +/** In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */ +val numStreams =
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15727587 --- Diff: extras/kinesis-asl/pom.xml --- @@ -0,0 +1,99 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + 4.0.0 + +org.apache.spark +spark-parent +1.1.0-SNAPSHOT +../../pom.xml + + +
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15727580 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Duration +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Helper class to create Amazon Kinesis Input Stream + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** --- End diff -- removed and cleaned up imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15727578 --- Diff: bin/run-example --- @@ -29,7 +29,9 @@ if [ -n "$1" ]; then else echo "Usage: ./bin/run-example [example-args]" 1>&2 echo " - set MASTER=XX to use a specific master" 1>&2 - echo " - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)" 1>&2 + echo " - can use abbreviated example class name relative to com.apache.spark.examples" 1>&2 + echo " (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL)" 1>&2 + echo " - to run the Kinesis Spark Streaming example, make sure you build with -Pkinesis-asl" 1>&2 --- End diff -- makes sense. i'll make sure this is addressed in streaming-kinesis.md. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-2770: Rename spark-ganglia-lgpl to gangl...
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/1730#issuecomment-50951751 oops. looks like too much got picked up. closing this PR. will retry later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2770: Rename spark-ganglia-lgpl to gangl...
Github user cfregly closed the pull request at: https://github.com/apache/spark/pull/1730 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2770: Rename spark-ganglia-lgpl to gangl...
GitHub user cfregly opened a pull request: https://github.com/apache/spark/pull/1730 SPARK-2770: Rename spark-ganglia-lgpl to ganglia-lgpl this PR will may conflict a bit with https://github.com/apache/spark/pull/1434 (SPARK-1981 Kinesis Streaming support) due to the timing and overlapping build changes, but the merge should be pretty straightforward, hopefully. You can merge this pull request into a Git repository by running: $ git pull https://github.com/cfregly/spark SPARK-2770 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1730.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1730 commit b3b0ff118cac3c0a5a10f9912b383bb0665c9a1b Author: Chris Fregly Date: 2014-07-16T07:03:04Z [SPARK-1981] Add AWS Kinesis streaming support commit d18e680670dfed3039477941e953b5811adde4ab Author: Chris Fregly Date: 2014-07-16T22:23:21Z Merge remote-tracking branch 'upstream/master' Resolved conflict: project/SparkBuild.scala commit cd68c0d7bb0c1ef38e7c92d0cd6eb4a7ccf2ce27 Author: Chris Fregly Date: 2014-07-19T03:16:13Z fixed typos and backward compatibility commit e7c8978482e1092cf599a31b402ab1f4ec11e36c Author: Chris Fregly Date: 2014-07-22T03:09:42Z Merge remote-tracking branch 'upstream/master' commit 828f8aeb1081cf7ad9e5386e1cce933ece9c3d62 Author: Chris Fregly Date: 2014-07-22T05:20:42Z more cleanup commit 338997e6e750c206bfb50a654b725be5f33beb07 Author: Chris Fregly Date: 2014-07-22T15:54:35Z improve build docs for kinesis commit 6c395619dde93a9b8e9137b1150de4ae5129cf4b Author: Chris Fregly Date: 2014-07-24T03:55:55Z parameterized the versions of the aws java sdk and kinesis client commit 21de67fbc5298ce73dd36fe8372dbeee27f69854 Author: Chris Fregly Date: 2014-07-31T00:43:29Z Merge remote-tracking branch 'upstream/master' Incorporated feedback from TD. Renamed project to kinesis-asl, simplified examples, improved docs, fixed bug with AWS regions outside of us-east-1 commit db3eefd0b845ade8251235698c74f8bdc5d35e5a Author: Chris Fregly Date: 2014-07-31T00:43:40Z Merge remote-tracking branch 'upstream/master' commit 912640cb344c77102e4ca4d884b8b0d0206ed627 Author: Chris Fregly Date: 2014-07-31T01:03:27Z changed the foundKinesis class to be a publically-avail class commit d17ca6d6a36ddf0a3030eacae0eace3fdd758cc5 Author: Chris Fregly Date: 2014-07-31T17:00:09Z per TD's feedback: updated docs, simplified the KinesisUtils api commit bf614e9ed870a3c23670d3783d574b1e4280bd81 Author: Chris Fregly Date: 2014-07-31T17:33:20Z per matei's feedback: moved the kinesis examples into the examples/ dir commit 4b36050fe7d4e5b35d474f4f7135cab2d98c48b2 Author: Chris Fregly Date: 2014-08-01T21:52:54Z Merge remote-tracking branch 'upstream/master' into SPARK-2770 commit 2a213155697c5e922d3336210b03f97ace8ae5ab Author: Chris Fregly Date: 2014-08-01T22:31:49Z SPARK-2770: Rename spark-ganglia-lgpl to ganglia-lgpl --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15685865 --- Diff: examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala --- @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer + +import scala.util.Random + +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext.rddToOrderedRDDFunctions +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer +import org.apache.spark.streaming.kinesis.KinesisUtils + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.model.PutRecordRequest + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on + * the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the + * given stream. + * It then starts pulling from the last checkpointed sequence number of the given + *and . + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCount + *is the name of the Kinesis stream (ie. mySparkStream) + *is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + *$ export AWS_ACCESS_KEY_ID= + *$ export AWS_SECRET_KEY= + *$ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.KinesisWordCount mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class below called KinesisWordCountProducer which puts + * dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in that class definition. + */ +object KinesisWordCount extends Logging { + val WordSeparator = " " + + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length < 2) { + System.err.println("Usage: KinesisWordCount ") + System.exit(1) +} + +/** + * (This was lifted from the StreamingExamples.scala in order to avoid the dependency + * on the spark-examples artifact.) + * Set reasonable logging levels for streaming if the user has not configured log4j. + */ +val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements() +if (!log4jInitialized) { + /** + * We first log something to initialize Spark's default logging, + * then we override the logging level. + * */ + logInfo(&q
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15685887 --- Diff: examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala --- @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer + +import scala.util.Random + +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext.rddToOrderedRDDFunctions +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer +import org.apache.spark.streaming.kinesis.KinesisUtils + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.model.PutRecordRequest + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on + * the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the + * given stream. + * It then starts pulling from the last checkpointed sequence number of the given + *and . + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCount + *is the name of the Kinesis stream (ie. mySparkStream) + *is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + *$ export AWS_ACCESS_KEY_ID= + *$ export AWS_SECRET_KEY= + *$ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.KinesisWordCount mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class below called KinesisWordCountProducer which puts + * dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in that class definition. + */ +object KinesisWordCount extends Logging { + val WordSeparator = " " + + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length < 2) { + System.err.println("Usage: KinesisWordCount ") + System.exit(1) +} + +/** + * (This was lifted from the StreamingExamples.scala in order to avoid the dependency + * on the spark-examples artifact.) + * Set reasonable logging levels for streaming if the user has not configured log4j. + */ +val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements() +if (!log4jInitialized) { + /** + * We first log something to initialize Spark's default logging, + * then we override the logging level. + * */ + logInfo(&q
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15685855 --- Diff: examples/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala --- @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer + +import scala.util.Random + +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext.rddToOrderedRDDFunctions +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer +import org.apache.spark.streaming.kinesis.KinesisUtils + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.model.PutRecordRequest + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on + * the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the + * given stream. + * It then starts pulling from the last checkpointed sequence number of the given + *and . + * + * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials + * in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCount + *is the name of the Kinesis stream (ie. mySparkStream) + *is the endpoint of the Kinesis service + * (ie. https://kinesis.us-east-1.amazonaws.com) + * + * Example: + *$ export AWS_ACCESS_KEY_ID= + *$ export AWS_SECRET_KEY= + *$ $SPARK_HOME/bin/run-example \ + *org.apache.spark.examples.streaming.KinesisWordCount mySparkStream \ + *https://kinesis.us-east-1.amazonaws.com + * + * There is a companion helper class below called KinesisWordCountProducer which puts + * dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in that class definition. + */ +object KinesisWordCount extends Logging { + val WordSeparator = " " + + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length < 2) { + System.err.println("Usage: KinesisWordCount ") + System.exit(1) +} + +/** + * (This was lifted from the StreamingExamples.scala in order to avoid the dependency + * on the spark-examples artifact.) + * Set reasonable logging levels for streaming if the user has not configured log4j. + */ +val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements() +if (!log4jInitialized) { + /** + * We first log something to initialize Spark's default logging, + * then we override the logging level. + * */ + logInfo(&q
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15685837 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessorUtils.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import scala.util.Random + +import org.apache.spark.Logging + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException + + +/** + * Helper for the KinesisRecordProcessor. + */ +private[kinesis] object KinesisRecordProcessorUtils extends Logging { --- End diff -- added the companion object --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15685828 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisStringRecordSerializer.scala --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging + +/** + * Implementation of KinesisRecordSerializer to convert Array[Byte] to/from String. + */ +class KinesisStringRecordSerializer extends KinesisRecordSerializer[String] with Logging { --- End diff -- i removed the Serializer abstraction and am just using basic byte[] <-> String conversions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50786500 ah, gotcha matei. the examples aren't part of the core, so they can depend on external libs. i'll make the change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15653238 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams + * by the Kinesis Client Library. If you change the App name or Stream name, + * the KCL will throw errors. + * @param stream Kinesis Stream Name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types + * of checkpoints. + * @param initialPositionInStream in the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream + * (InitialPositionInStream.LATEST). + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk --- End diff -- right, i knew you would ask about this! scala can't differentiate between the two overloaded methods with default arguments in this situation: https://groups.google.com/forum/#!msg/scala-user/FyQK3-cqfaY/fXLHr8QsW_0J it's totes weird, i know. i'll remove storageLevel to simplify. i'd like to keep initialPositionInStream in case there's a need to differentiate. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15651971 --- Diff: extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + * :: Experimental :: + */ +@Experimental +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams + * by the Kinesis Client Library. If you change the App name or Stream name, + * the KCL will throw errors. + * @param stream Kinesis Stream Name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15651823 --- Diff: project/SparkBuild.scala --- @@ -62,7 +62,7 @@ object SparkBuild extends PomBuild { var isAlphaYarn = false var profiles: mutable.Seq[String] = mutable.Seq.empty if (Properties.envOrNone("SPARK_GANGLIA_LGPL").isDefined) { - println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pganglia-lgpl flag.") + println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pspark-ganglia-lgpl flag.") --- End diff -- that -P wasn't working. new jira for the full monty: https://issues.apache.org/jira/browse/SPARK-2770 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15620896 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory and on-disk to 2 nodes total (primary and secondary) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( +ssc: StreamingContext, +app: String, +stream: String, +endpoint: String, +checkpointIntervalMillis: Long, +initialPositionInStream: InitialPositionInStream = InitialPositionInStream.TRIM_HORIZON, +storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): ReceiverInputDStream[Array[Byte]] = { + +ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, checkpointIntervalMillis, initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param JavaStreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The val
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15618561 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer +import org.apache.spark.streaming.kinesis.KinesisUtils +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.model.PutRecordRequest +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.dstream.DStream + +/** + * Kinesis Spark Streaming WordCount example. + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the given stream. + * It then starts pulling from the tip of the given and at the given . + * Because we're pulling from the tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts. + * This could lead to missed records if data is added to the stream while no KinesisReceivers are running. + * In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data + * depending on the checkpoint frequency. + * + * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency. + * Record processing should be idempotent when possible. + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: KinesisWordCount + *is the name of the Kinesis stream (ie. mySparkStream) + *is the endpoint of the Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + *is the batch interval in millis (ie. 1000ms) + * + * Example: + * $ export AWS_ACCESS_KEY_ID= + * $ export AWS_SECRET_KEY= + *$ bin/run-kinesis-example \ + *org.apache.spark.examples.streaming.KinesisWordCount mySparkStream https://kinesis.us-east-1.amazonaws.com 100 + * + * There is a companion helper class below called KinesisWordCountProducer which puts dummy data onto the Kinesis stream. + * Usage instructions for KinesisWordCountProducer are provided in that class definition. + */ +object KinesisWordCount extends Logging { + val WordSeparator = " " + + def main(args: Array[String]) { +/** + * Check that all required args were passed in. + */ +if (args.length < 3) { + System.err.println("Usage: KinesisWordCount ") + System.exit(1) +} + +/** + * (This was lifted fr
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15618546 --- Diff: extras/spark-kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.streaming; + +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Milliseconds; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.dstream.DStream; +import org.apache.spark.streaming.kinesis.KinesisRecordSerializer; +import org.apache.spark.streaming.kinesis.KinesisStringRecordSerializer; +import org.apache.spark.streaming.kinesis.KinesisUtils; + +import scala.Tuple2; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +/** + * Java-friendly Kinesis Spark Streaming WordCount example + * + * See http://spark.apache.org/docs/latest/streaming-programming-guide.html for more details on the Kinesis Spark Streaming integration. + * + * This example spins up 1 Kinesis Worker (Spark Streaming Receivers) per shard of the given stream. + * It then starts pulling from the tip of the given and at the given . + * Because we're pulling from the tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts. + * This could lead to missed records if data is added to the stream while no KinesisReceivers are running. + * In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data + * depending on the checkpoint frequency. + * InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency. + * Record processing should be idempotent when possible. + * + * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs + * Instance profile credentials - delivered through the Amazon EC2 metadata service + * + * Usage: JavaKinesisWordCount + * is the name of the Kinesis stream (ie. mySparkStream) + * is the endpoint of the Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com) + * is the batch interval in milliseconds (ie. 1000ms) + * + * Example: + * $ export AWS_ACCESS_KEY_ID= + * $ export AWS_SECRET_KEY= + *$ bin/run-kinesis-example \ + *org.apache.spark.examples.streaming.JavaKinesisWordCount mySparkStream https://kinesis.us-east-1.amazonaws.com 1000 + * + * There is a companion helper class called KinesisWordCountProducer which puts dummy data onto the Kinesis stream.
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15618503 --- Diff: extras/spark-kinesis-asl/src/test/resources/log4j.properties --- @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=WARN, console + --- End diff -- matched --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15618507 --- Diff: project/SparkBuild.scala --- @@ -60,9 +60,13 @@ object SparkBuild extends PomBuild { var isAlphaYarn = false var profiles: mutable.Seq[String] = mutable.Seq.empty if (Properties.envOrNone("SPARK_GANGLIA_LGPL").isDefined) { - println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pganglia-lgpl flag.") + println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pspark-ganglia-lgpl flag.") profiles ++= Seq("spark-ganglia-lgpl") } +if (Properties.envOrNone("SPARK_KINESIS_ASL").isDefined) { --- End diff -- removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15618463 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory and on-disk to 2 nodes total (primary and secondary) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( +ssc: StreamingContext, +app: String, +stream: String, +endpoint: String, +checkpointIntervalMillis: Long, +initialPositionInStream: InitialPositionInStream = InitialPositionInStream.TRIM_HORIZON, +storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): ReceiverInputDStream[Array[Byte]] = { + +ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, checkpointIntervalMillis, initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param JavaStreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The val
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15617773 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15617760 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15617770 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory and on-disk to 2 nodes total (primary and secondary) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( +ssc: StreamingContext, +app: String, +stream: String, +endpoint: String, +checkpointIntervalMillis: Long, +initialPositionInStream: InitialPositionInStream = InitialPositionInStream.TRIM_HORIZON, +storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): ReceiverInputDStream[Array[Byte]] = { + +ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, checkpointIntervalMillis, initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param JavaStreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. --- End diff --
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15617745 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.net.InetAddress +import java.util.UUID +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker +import java.nio.ByteBuffer +import org.apache.spark.streaming.util.SystemClock + +/** + * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. + * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: + * https://github.com/awslabs/amazon-kinesis-client + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here: + * http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Instances of this class will get shipped to the Spark Streaming Workers to run within a Spark Executor. + * + * @param app name --- End diff -- updated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15617751 --- Diff: docs/streaming-programming-guide.md --- @@ -467,6 +468,62 @@ For more details on these additional sources, see the corresponding [API documen Furthermore, you can also implement your own custom receiver for your sources. See the [Custom Receiver Guide](streaming-custom-receivers.html). +### Kinesis --- End diff -- moved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15616323 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.net.InetAddress +import java.util.UUID +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker +import java.nio.ByteBuffer +import org.apache.spark.streaming.util.SystemClock + +/** + * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. + * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: + * https://github.com/awslabs/amazon-kinesis-client + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here: + * http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Instances of this class will get shipped to the Spark Streaming Workers to run within a Spark Executor. + * + * @param app name + * @param Kinesis stream name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * @param persistence strategy for RDDs and DStreams. + */ +private[streaming] class KinesisReceiver( + app: String, + stream: String, + endpoint: String, + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream, + storageLevel: StorageLevel) + extends Receiver[Array[Byte]](storageLevel) with Logging { receiver => + + /** + * The lazy val's below will get instantiated in the remote Executor after the closure is shipped to the Spark Worker. + * These are all lazy because they're from third-party Amazon libraries and are not Serializable. + * If they're not marked lazy, they will cause NotSerializableExceptions when they're shipped to the Spark Worker. + */ + + /** + * workerId is lazy because we want the address of the actual Worker where the code runs - not the Driver's ip address. + * This makes a difference when running in a cluster. + */ + lazy val workerId = InetAddress.getLocalHost.getHostAddress() + ":" + UUID.randomUUID() + + /** + * This impl uses the DefaultAWSCredentialsProviderChain per the following url: + * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html + * and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI + * Instance profile credentials deli
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15616231 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException +import org.apache.spark.Logging +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import scala.util.Random +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.util.Clock +import org.apache.spark.streaming.util.SystemClock + +/** + * Facade to create the Scala-based or Java-based streams. + * Also, contains a reusable utility methods. + */ +object KinesisUtils extends Logging { + /** + * Create an InputDStream that pulls messages from a Kinesis stream. + * + * @param StreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory and on-disk to 2 nodes total (primary and secondary) + * + * @return ReceiverInputDStream[Array[Byte]] + */ + def createStream( +ssc: StreamingContext, +app: String, +stream: String, +endpoint: String, +checkpointIntervalMillis: Long, +initialPositionInStream: InitialPositionInStream = InitialPositionInStream.TRIM_HORIZON, +storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): ReceiverInputDStream[Array[Byte]] = { + +ssc.receiverStream(new KinesisReceiver(app, stream, endpoint, checkpointIntervalMillis, initialPositionInStream, storageLevel)) + } + + /** + * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream. + * + * @param JavaStreamingContext object + * @param app name + * @param stream name + * @param endpoint + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk of processing records more than once. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The val
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15615786 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.net.InetAddress +import java.util.UUID +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker +import java.nio.ByteBuffer +import org.apache.spark.streaming.util.SystemClock + +/** + * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. + * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: + * https://github.com/awslabs/amazon-kinesis-client + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here: + * http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Instances of this class will get shipped to the Spark Streaming Workers to run within a Spark Executor. + * + * @param app name + * @param Kinesis stream name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * @param persistence strategy for RDDs and DStreams. + */ +private[streaming] class KinesisReceiver( + app: String, + stream: String, + endpoint: String, + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream, + storageLevel: StorageLevel) + extends Receiver[Array[Byte]](storageLevel) with Logging { receiver => + + /** + * The lazy val's below will get instantiated in the remote Executor after the closure is shipped to the Spark Worker. + * These are all lazy because they're from third-party Amazon libraries and are not Serializable. + * If they're not marked lazy, they will cause NotSerializableExceptions when they're shipped to the Spark Worker. + */ + + /** + * workerId is lazy because we want the address of the actual Worker where the code runs - not the Driver's ip address. + * This makes a difference when running in a cluster. + */ + lazy val workerId = InetAddress.getLocalHost.getHostAddress() + ":" + UUID.randomUUID() + + /** + * This impl uses the DefaultAWSCredentialsProviderChain per the following url: + * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html + * and searches for credentials in the following order of precedence: + * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY + * Java System Properties - aws.accessKeyId and aws.secretKey + * Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI + * Instance profile credentials deli
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r1852 --- Diff: extras/spark-kinesis-asl/pom.xml --- @@ -0,0 +1,98 @@ + --- End diff -- makes sense. i'll update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r1717 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.net.InetAddress +import java.util.UUID +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker +import java.nio.ByteBuffer +import org.apache.spark.streaming.util.SystemClock + +/** + * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. + * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: + * https://github.com/awslabs/amazon-kinesis-client + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here: + * http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Instances of this class will get shipped to the Spark Streaming Workers to run within a Spark Executor. + * + * @param app name + * @param Kinesis stream name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * @param persistence strategy for RDDs and DStreams. + */ +private[streaming] class KinesisReceiver( + app: String, + stream: String, + endpoint: String, + checkpointIntervalMillis: Long, + initialPositionInStream: InitialPositionInStream, + storageLevel: StorageLevel) + extends Receiver[Array[Byte]](storageLevel) with Logging { receiver => + + /** + * The lazy val's below will get instantiated in the remote Executor after the closure is shipped to the Spark Worker. + * These are all lazy because they're from third-party Amazon libraries and are not Serializable. + * If they're not marked lazy, they will cause NotSerializableExceptions when they're shipped to the Spark Worker. + */ + + /** + * workerId is lazy because we want the address of the actual Worker where the code runs - not the Driver's ip address. + * This makes a difference when running in a cluster. + */ + lazy val workerId = InetAddress.getLocalHost.getHostAddress() + ":" + UUID.randomUUID() --- End diff -- there can be multiple workers per host, so i can't just use the host address. but to answer your question, i guess i don't really need the host address since i'm generating a random UUID. however, i found it useful when reviewing logs for debugging purposes. i'll keep for now unless you have a strong objection. good catch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not w
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r1598 --- Diff: extras/spark-kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.net.InetAddress +import java.util.UUID +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker +import java.nio.ByteBuffer +import org.apache.spark.streaming.util.SystemClock + +/** + * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. + * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: + * https://github.com/awslabs/amazon-kinesis-client + * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here: + * http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Instances of this class will get shipped to the Spark Streaming Workers to run within a Spark Executor. + * + * @param app name + * @param Kinesis stream name + * @param endpoint url of Kinesis service + * @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing). + * See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints. + * @param in the absence of Kinesis checkpoint info, this is the worker's initial starting position in the stream. + * The values are either the beginning of the stream per Kinesis' limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) + * or the tip of the stream using InitialPositionInStream.LATEST. + * @param persistence strategy for RDDs and DStreams. + */ +private[streaming] class KinesisReceiver( + app: String, --- End diff -- good catch. i went through and fixed another similar formatting mistake, as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r1270 --- Diff: bin/run-kinesis-example --- @@ -0,0 +1,60 @@ +#!/usr/bin/env bash + --- End diff -- yup, yup. this is a special circumstance due to the licensing. i didn't want to complicate the examples build by depending on an optional package, so this was the best workaround. i put the scripts alongside the other scripts to reduce confusion, but i can move it to extras/kinesis/bin for sure. i'll update the docs accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50253442 this PR worked for @srosenthal , btw. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50253422 also, can someone address the questions i have here regarding the ec2 scripts and other peripheral aspects of this PR: https://issues.apache.org/jira/browse/SPARK-1981?focusedCommentId=14072761 thanks! -chris --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/1434#issuecomment-50253400 @mateiz - this is a completely brand-new, from-scratch implementation. parviz's old code was actually a Scala port of the Java-based Kinesis sample app found here: https://github.com/aws/aws-sdk-java/blob/master/src/samples/AmazonKinesisApplication/SampleRecordProcessor.java this was fine for a quick/dirty sample of kinesis functionality, but my goal was to make this code more reusable, testable, readable, configurable, production-ready, and well-documented. the old code did not support the new Streaming 1.0 API and extras/ build structure (due to ASL-license). i also updated the AWS Java SDK and Kinesis Client Libraries to their latest versions. here is parviz's PR for reference/comparison: https://github.com/apache/spark/pull/223. i've addressed all of the comments provided in the old PR to speed up the acceptance of this new PR. thanks and i look forward to getting this merged! -chris --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15141666 --- Diff: project/SparkBuild.scala --- @@ -63,6 +63,10 @@ object SparkBuild extends PomBuild { println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pganglia-lgpl flag.") profiles ++= Seq("spark-ganglia-lgpl") } +if (Properties.envOrNone("SPARK_KINESIS_ASL").isDefined) { + println("NOTE: SPARK_KINESIS_ASL is deprecated, please use -Pspark-kinesis-asl flag.") + profiles ++= Seq("spark-ganglia-lgpl") --- End diff -- good catch, stephen. thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support
GitHub user cfregly opened a pull request: https://github.com/apache/spark/pull/1434 [SPARK-1981] Add AWS Kinesis streaming support You can merge this pull request into a Git repository by running: $ git pull https://github.com/cfregly/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1434.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1434 commit b3b0ff118cac3c0a5a10f9912b383bb0665c9a1b Author: Chris Fregly Date: 2014-07-16T07:03:04Z [SPARK-1981] Add AWS Kinesis streaming support --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: support for Kinesis
Github user cfregly commented on the pull request: https://github.com/apache/spark/pull/223#issuecomment-44760308 update: i discusses this with parviz recently - and we agreed that i would take this over. new PR to come shortly. here's the jira ticket: https://issues.apache.org/jira/browse/SPARK-1981 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: support for Kinesis
Github user cfregly commented on a diff in the pull request: https://github.com/apache/spark/pull/223#discussion_r11101110 --- Diff: examples/src/main/scala/org/apache/spark/streaming/examples/KinesisWordCount.scala --- @@ -0,0 +1,35 @@ +package org.apache.spark.streaming.examples + +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.kinesis.KinesisUtils +import org.apache.spark.streaming.StreamingContext._ + + +object KinesisWordCount { --- End diff -- @pdeyhim - per our offline convo this wknd, please add a note about running this demo with a minimum of master=local[2] (2 threads). otherwise it appears that the KinesisNetworkReceiver thread does not startup - breaking the demo. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---