This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 907074bafad [SPARK-38881][DSTREAMS][KINESIS][PYSPARK] Added Support 
for CloudWatch MetricsLevel Config
907074bafad is described below

commit 907074bafad0da3d1c802a4389589658ecf93432
Author: Mark Khaitman <mkhait...@freewheel.com>
AuthorDate: Sat Apr 16 21:30:15 2022 -0500

    [SPARK-38881][DSTREAMS][KINESIS][PYSPARK] Added Support for CloudWatch 
MetricsLevel Config
    
    JIRA: https://issues.apache.org/jira/browse/SPARK-38881
    
    ### What changes were proposed in this pull request?
    
    Exposing a configuration option (metricsLevel) used for CloudWatch metrics 
reporting when consuming from an AWS Kinesis Stream, which is already available 
in Scala/Java Spark APIs
    
    This relates to https://issues.apache.org/jira/browse/SPARK-27420 which was 
merged as part of Spark 3.0.0
    
    ### Why are the changes needed?
    
    This change is desirable as it further exposes the metricsLevel config 
parameter that was added for the Scala/Java Spark APIs when working with the 
Kinesis Streaming integration, and makes it available to the PySpark API as 
well.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Default behavior of MetricsLevel.DETAILED is maintained.
    
    ### How was this patch tested?
    
    This change passes all tests, and local testing was done with a development 
Kinesis stream in AWS, in order to confirm that metrics were no longer being 
reported to CloudWatch after specifying MetricsLevel.NONE in the PySpark 
Kinesis streaming context creation, and also worked as it does today when 
leaving the MetricsLevel parameter out, which would result in a default of 
DETAILED, with CloudWatch metrics appearing again.
    
    Built with:
    ```
    # ./build/mvn -pl :spark-streaming-kinesis-asl_2.12 -DskipTests 
-Pkinesis-asl clean install
    ```
    
    Tested with small pyspark kinesis streaming context + AWS kinesis stream, 
using updated streaming kinesis asl jar:
    
    ```
    # spark-submit --packages 
org.apache.spark:spark-streaming-kinesis-asl_2.12:3.2.1 --jars 
spark/connector/kinesis-asl/target/spark-streaming-kinesis-asl_2.12-3.4.0-SNAPSHOT.jar
 metricsLevelTesting.py
    ```
    
    Closes #36201 from mkman84/metricsLevel-pyspark.
    
    Authored-by: Mark Khaitman <mkhait...@freewheel.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../kinesis/KinesisUtilsPythonHelper.scala         | 10 ++++++++++
 docs/streaming-kinesis-integration.md              | 10 ++++++----
 python/pyspark/streaming/kinesis.py                | 22 +++++++++++++++++-----
 python/pyspark/streaming/tests/test_kinesis.py     |  5 ++++-
 4 files changed, 37 insertions(+), 10 deletions(-)

diff --git 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala
 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala
index 0056438c4ee..8abaef6b834 100644
--- 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala
+++ 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtilsPythonHelper.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.streaming.kinesis
 
 import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.Duration
@@ -37,6 +38,7 @@ private class KinesisUtilsPythonHelper {
       regionName: String,
       initialPositionInStream: Int,
       checkpointInterval: Duration,
+      metricsLevel: Int,
       storageLevel: StorageLevel,
       awsAccessKeyId: String,
       awsSecretKey: String,
@@ -64,6 +66,13 @@ private class KinesisUtilsPythonHelper {
           "InitialPositionInStream.LATEST or 
InitialPositionInStream.TRIM_HORIZON")
     }
 
+    val cloudWatchMetricsLevel = metricsLevel match {
+      case 0 => MetricsLevel.DETAILED
+      case 1 => MetricsLevel.SUMMARY
+      case 2 => MetricsLevel.NONE
+      case _ => MetricsLevel.DETAILED
+    }
+
     val builder = KinesisInputDStream.builder.
       streamingContext(jssc).
       checkpointAppName(kinesisAppName).
@@ -72,6 +81,7 @@ private class KinesisUtilsPythonHelper {
       regionName(regionName).
       
initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(kinesisInitialPosition)).
       checkpointInterval(checkpointInterval).
+      metricsLevel(cloudWatchMetricsLevel).
       storageLevel(storageLevel)
 
     if (stsAssumeRoleArn != null && stsSessionName != null && stsExternalId != 
null) {
diff --git a/docs/streaming-kinesis-integration.md 
b/docs/streaming-kinesis-integration.md
index dc80ff05226..2ce30d7efe2 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -56,6 +56,7 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
                 .initialPosition([initial position])
                 .checkpointAppName([Kinesis app name])
                 .checkpointInterval([checkpoint interval])
+                .metricsLevel([metricsLevel.DETAILED])
                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
                 .build()
 
@@ -78,6 +79,7 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
                 .initialPosition([initial position])
                 .checkpointAppName([Kinesis app name])
                 .checkpointInterval([checkpoint interval])
+                .metricsLevel([metricsLevel.DETAILED])
                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
                 .build();
 
@@ -90,20 +92,20 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
 
             kinesisStream = KinesisUtils.createStream(
                 streamingContext, [Kinesis app name], [Kinesis stream name], 
[endpoint URL],
-                [region name], [initial position], [checkpoint interval], 
StorageLevel.MEMORY_AND_DISK_2)
+                [region name], [initial position], [checkpoint interval], 
[metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)
 
        See the [API docs](api/python/reference/pyspark.streaming.html#kinesis)
        and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py).
 Refer to the [Running the Example](#running-the-example) subsection for 
instructions to run the example.
 
+       - CloudWatch metrics level and dimensions. See [the AWS documentation 
about monitoring 
KCL](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html) 
for details. Default is MetricsLevel.DETAILED
+
        </div>
        </div>
 
-       You may also provide the following settings. These are currently only 
supported in Scala and Java.
+       You may also provide the following settings. This is currently only 
supported in Scala and Java.
 
        - A "message handler function" that takes a Kinesis `Record` and 
returns a generic object `T`, in case you would like to use other data included 
in a `Record` such as partition key.
 
-       - CloudWatch metrics level and dimensions. See [the AWS documentation 
about monitoring 
KCL](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html) 
for details.
-
        <div class="codetabs">
        <div data-lang="scala" markdown="1">
                 import collection.JavaConverters._
diff --git a/python/pyspark/streaming/kinesis.py 
b/python/pyspark/streaming/kinesis.py
index 150fb79f572..0eede341f9b 100644
--- a/python/pyspark/streaming/kinesis.py
+++ b/python/pyspark/streaming/kinesis.py
@@ -23,7 +23,16 @@ from pyspark.streaming.context import StreamingContext
 from pyspark.util import _print_missing_jar
 
 
-__all__ = ["KinesisUtils", "InitialPositionInStream", "utf8_decoder"]
+__all__ = ["KinesisUtils", "InitialPositionInStream", "MetricsLevel", 
"utf8_decoder"]
+
+
+class InitialPositionInStream:
+    LATEST, TRIM_HORIZON = (0, 1)
+
+
+class MetricsLevel:
+    DETAILED, SUMMARY, NONE = (0, 1, 2)
+
 
 T = TypeVar("T")
 
@@ -46,6 +55,7 @@ class KinesisUtils:
         regionName: str,
         initialPositionInStream: str,
         checkpointInterval: int,
+        metricsLevel: int = MetricsLevel.DETAILED,
         storageLevel: StorageLevel = ...,
         awsAccessKeyId: Optional[str] = ...,
         awsSecretKey: Optional[str] = ...,
@@ -66,6 +76,7 @@ class KinesisUtils:
         regionName: str,
         initialPositionInStream: str,
         checkpointInterval: int,
+        metricsLevel: int = MetricsLevel.DETAILED,
         storageLevel: StorageLevel = ...,
         awsAccessKeyId: Optional[str] = ...,
         awsSecretKey: Optional[str] = ...,
@@ -85,6 +96,7 @@ class KinesisUtils:
         regionName: str,
         initialPositionInStream: str,
         checkpointInterval: int,
+        metricsLevel: int = MetricsLevel.DETAILED,
         storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2,
         awsAccessKeyId: Optional[str] = None,
         awsSecretKey: Optional[str] = None,
@@ -123,6 +135,9 @@ class KinesisUtils:
             Checkpoint interval(in seconds) for Kinesis checkpointing. See the 
Kinesis
             Spark Streaming documentation for more details on the different
             types of checkpoints.
+        metricsLevel : int
+            Level of CloudWatch PutMetrics.
+            Can be set to either DETAILED, SUMMARY, or NONE. (default is 
DETAILED)
         storageLevel : :class:`pyspark.StorageLevel`, optional
             Storage level to use for storing the received objects (default is
             StorageLevel.MEMORY_AND_DISK_2)
@@ -178,6 +193,7 @@ class KinesisUtils:
             regionName,
             initialPositionInStream,
             jduration,
+            metricsLevel,
             jlevel,
             awsAccessKeyId,
             awsSecretKey,
@@ -187,7 +203,3 @@ class KinesisUtils:
         )
         stream: DStream = DStream(jstream, ssc, NoOpSerializer())
         return stream.map(lambda v: decoder(v))
-
-
-class InitialPositionInStream:
-    LATEST, TRIM_HORIZON = (0, 1)
diff --git a/python/pyspark/streaming/tests/test_kinesis.py 
b/python/pyspark/streaming/tests/test_kinesis.py
index 221ec4dd984..7b09f5b8f5d 100644
--- a/python/pyspark/streaming/tests/test_kinesis.py
+++ b/python/pyspark/streaming/tests/test_kinesis.py
@@ -18,7 +18,7 @@ import time
 import unittest
 
 from pyspark import StorageLevel
-from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream, 
MetricsLevel
 from pyspark.testing.streamingutils import (
     should_test_kinesis,
     kinesis_requirement_message,
@@ -38,6 +38,7 @@ class KinesisStreamTests(PySparkStreamingTestCase):
             "us-west-2",
             InitialPositionInStream.LATEST,
             2,
+            MetricsLevel.DETAILED,
             StorageLevel.MEMORY_AND_DISK_2,
         )
         KinesisUtils.createStream(
@@ -48,6 +49,7 @@ class KinesisStreamTests(PySparkStreamingTestCase):
             "us-west-2",
             InitialPositionInStream.LATEST,
             2,
+            MetricsLevel.DETAILED,
             StorageLevel.MEMORY_AND_DISK_2,
             "awsAccessKey",
             "awsSecretKey",
@@ -69,6 +71,7 @@ class KinesisStreamTests(PySparkStreamingTestCase):
                 kinesisTestUtils.regionName(),
                 InitialPositionInStream.LATEST,
                 10,
+                MetricsLevel.DETAILED,
                 StorageLevel.MEMORY_ONLY,
                 aWSCredentials.getAWSAccessKeyId(),
                 aWSCredentials.getAWSSecretKey(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to