This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir.git


The following commit(s) were added to refs/heads/master by this push:
     new 62df110  [BAHIR-233] Add SNS message support for SQS streaming source 
(#97)
62df110 is described below

commit 62df1108145ee0305c5c7416a7dadeae5930aab8
Author: DmitryGrb <[email protected]>
AuthorDate: Sat Dec 12 22:16:46 2020 +0300

    [BAHIR-233] Add SNS message support for SQS streaming source (#97)
    
    Added messageWrapper option for SQS streaming connector
    which says if this is pure s3 notification event or it is coming
    from SNS topic
---
 sql-streaming-sqs/README.md                        | 12 +++++++++++
 .../apache/spark/sql/streaming/sqs/SqsClient.scala | 23 ++++++++++++++++++++--
 .../spark/sql/streaming/sqs/SqsSourceOptions.scala | 16 +++++++++++++++
 .../sql/streaming/sqs/SqsSourceOptionsSuite.scala  |  2 ++
 4 files changed, 51 insertions(+), 2 deletions(-)

diff --git a/sql-streaming-sqs/README.md b/sql-streaming-sqs/README.md
index b0555d6..8dd8a54 100644
--- a/sql-streaming-sqs/README.md
+++ b/sql-streaming-sqs/README.md
@@ -63,6 +63,18 @@ shouldSortFiles|true|whether to sort files based on 
timestamp while listing them
 useInstanceProfileCredentials|false|Whether to use EC2 instance profile 
credentials for connecting to Amazon SQS
 maxFilesPerTrigger|no default value|maximum number of files to process in a 
microbatch
 maxFileAge|7d|Maximum age of a file that can be found in this directory
+messageWrapper|None| - 'None' if SQS contains plain S3 message. <br/> - 'SNS' 
if SQS contains S3 notification message which came from SNS. <br/> Please see 
'Use multiple consumers' section for more details 
+
+## Use multiple consumers
+
+SQS cannot be read by multiple consumers. <br/> 
+If S3 path should be listen by multiple applications the following approach is 
recommended: S3 -> SNS -> SQS:
+1. Create multiple SQS queues. Each application listen for one SQS queue.
+2. Create 1 SNS topic
+3. Once S3 notification event is pushed to SNS topic it will be delivered to 
each SQS queue 
+
+Thus, one S3 path can be processed by multiple applications. <br/>
+ 
 
 ## Example
 
diff --git 
a/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
 
b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
index 1d9bb97..72b4568 100644
--- 
a/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
+++ 
b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
@@ -28,7 +28,7 @@ import com.amazonaws.services.sqs.{AmazonSQS, 
AmazonSQSClientBuilder}
 import com.amazonaws.services.sqs.model.{DeleteMessageBatchRequestEntry, 
Message, ReceiveMessageRequest}
 import org.apache.hadoop.conf.Configuration
 import org.json4s.{DefaultFormats, MappingException}
-import org.json4s.JsonAST.JValue
+import org.json4s.JsonAST.{JNothing, JValue}
 import org.json4s.jackson.JsonMethods.parse
 
 import org.apache.spark.SparkException
@@ -131,13 +131,32 @@ class SqsClient(sourceOptions: SqsSourceOptions,
     }
   }
 
+  private def tryToParseSNS(parsedBody: JValue): JValue = {
+    implicit val formats = DefaultFormats
+    parsedBody \ "Message" match {
+      case JNothing => throw new MappingException("Original message does not 
look like SNS one. " +
+        "Please check your setup and make sure it is S3 notification event 
coming from SNS")
+      case value => parse(value.extract[String])
+    }
+  }
+
+  private def extractS3Message(parsedBody: JValue): JValue = {
+    sourceOptions.messageWrapper match {
+      case sourceOptions.S3MessageWrapper.None => parsedBody
+      case sourceOptions.S3MessageWrapper.SNS => tryToParseSNS(parsedBody)
+    }
+  }
+
   private def parseSqsMessages(messageList: Seq[Message]): Seq[(String, Long, 
String)] = {
     val errorMessages = scala.collection.mutable.ListBuffer[String]()
     val parsedMessages = messageList.foldLeft(Seq[(String, Long, String)]()) { 
(list, message) =>
       implicit val formats = DefaultFormats
       try {
         val messageReceiptHandle = message.getReceiptHandle
-        val messageJson = parse(message.getBody).extract[JValue]
+
+        val parsedBody: JValue = parse(message.getBody).extract[JValue]
+        val messageJson = extractS3Message(parsedBody)
+
         val bucketName = (
           messageJson \ "Records" \ "s3" \ "bucket" \ 
"name").extract[Array[String]].head
         val eventName = (messageJson \ "Records" \ 
"eventName").extract[Array[String]].head
diff --git 
a/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala
 
b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala
index a4c0cc1..0c2eda0 100644
--- 
a/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala
+++ 
b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala
@@ -28,6 +28,15 @@ import org.apache.spark.util.Utils
  */
 class SqsSourceOptions(parameters: CaseInsensitiveMap[String]) extends Logging 
{
 
+  object S3MessageWrapper extends Enumeration {
+    type MessageFormat = Value
+    val None, SNS = Value
+
+    def withNameOpt(opt: String): Option[Value] = {
+      values.find(_.toString.toLowerCase == opt.toLowerCase)
+    }
+  }
+
   def this(parameters: Map[String, String]) = 
this(CaseInsensitiveMap(parameters))
 
   val maxFilesPerTrigger: Option[Int] = 
parameters.get("maxFilesPerTrigger").map { str =>
@@ -92,6 +101,13 @@ class SqsSourceOptions(parameters: 
CaseInsensitiveMap[String]) extends Logging {
     throw new IllegalArgumentException("Specifying file format is mandatory 
with sqs source")
   }
 
+  val messageWrapper: S3MessageWrapper.Value = 
parameters.get("messageWrapper").map( str =>
+    S3MessageWrapper.withNameOpt(str).getOrElse({
+      throw new IllegalArgumentException(s"Invalid value '$str' for option 
'messageWrapper', " +
+        s"must be one of [${S3MessageWrapper.values.mkString(", ")}]")
+    })
+  ).getOrElse(S3MessageWrapper.None)
+
   val ignoreFileDeletion: Boolean = withBooleanParameter("ignoreFileDeletion", 
false)
 
    /**
diff --git 
a/sql-streaming-sqs/src/test/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptionsSuite.scala
 
b/sql-streaming-sqs/src/test/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptionsSuite.scala
index 6382fb1..e43bae3 100644
--- 
a/sql-streaming-sqs/src/test/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptionsSuite.scala
+++ 
b/sql-streaming-sqs/src/test/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptionsSuite.scala
@@ -74,6 +74,8 @@ class SqsSourceOptionsSuite extends StreamTest {
       "for option 'shouldSortFiles', must be true or false")
     testBadOptions("useInstanceProfileCredentials" -> "x")("Invalid value 'x' 
" +
       "for option 'useInstanceProfileCredentials', must be true or false")
+    testBadOptions("messageWrapper" -> "x")("Invalid value 'x' " +
+      "for option 'messageWrapper', must be one of [none, sns]")
 
   }
 

Reply via email to