This is an automated email from the ASF dual-hosted git repository.
lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir.git
The following commit(s) were added to refs/heads/master by this push:
new 62df110 [BAHIR-233] Add SNS message support for SQS streaming source
(#97)
62df110 is described below
commit 62df1108145ee0305c5c7416a7dadeae5930aab8
Author: DmitryGrb <[email protected]>
AuthorDate: Sat Dec 12 22:16:46 2020 +0300
[BAHIR-233] Add SNS message support for SQS streaming source (#97)
Added messageWrapper option for SQS streaming connector
which says if this is pure s3 notification event or it is coming
from SNS topic
---
sql-streaming-sqs/README.md | 12 +++++++++++
.../apache/spark/sql/streaming/sqs/SqsClient.scala | 23 ++++++++++++++++++++--
.../spark/sql/streaming/sqs/SqsSourceOptions.scala | 16 +++++++++++++++
.../sql/streaming/sqs/SqsSourceOptionsSuite.scala | 2 ++
4 files changed, 51 insertions(+), 2 deletions(-)
diff --git a/sql-streaming-sqs/README.md b/sql-streaming-sqs/README.md
index b0555d6..8dd8a54 100644
--- a/sql-streaming-sqs/README.md
+++ b/sql-streaming-sqs/README.md
@@ -63,6 +63,18 @@ shouldSortFiles|true|whether to sort files based on
timestamp while listing them
useInstanceProfileCredentials|false|Whether to use EC2 instance profile
credentials for connecting to Amazon SQS
maxFilesPerTrigger|no default value|maximum number of files to process in a
microbatch
maxFileAge|7d|Maximum age of a file that can be found in this directory
+messageWrapper|None| - 'None' if SQS contains plain S3 message. <br/> - 'SNS'
if SQS contains S3 notification message which came from SNS. <br/> Please see
'Use multiple consumers' section for more details
+
+## Use multiple consumers
+
+SQS cannot be read by multiple consumers. <br/>
+If S3 path should be listen by multiple applications the following approach is
recommended: S3 -> SNS -> SQS:
+1. Create multiple SQS queues. Each application listen for one SQS queue.
+2. Create 1 SNS topic
+3. Once S3 notification event is pushed to SNS topic it will be delivered to
each SQS queue
+
+Thus, one S3 path can be processed by multiple applications. <br/>
+
## Example
diff --git
a/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
index 1d9bb97..72b4568 100644
---
a/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
+++
b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
@@ -28,7 +28,7 @@ import com.amazonaws.services.sqs.{AmazonSQS,
AmazonSQSClientBuilder}
import com.amazonaws.services.sqs.model.{DeleteMessageBatchRequestEntry,
Message, ReceiveMessageRequest}
import org.apache.hadoop.conf.Configuration
import org.json4s.{DefaultFormats, MappingException}
-import org.json4s.JsonAST.JValue
+import org.json4s.JsonAST.{JNothing, JValue}
import org.json4s.jackson.JsonMethods.parse
import org.apache.spark.SparkException
@@ -131,13 +131,32 @@ class SqsClient(sourceOptions: SqsSourceOptions,
}
}
+ private def tryToParseSNS(parsedBody: JValue): JValue = {
+ implicit val formats = DefaultFormats
+ parsedBody \ "Message" match {
+ case JNothing => throw new MappingException("Original message does not
look like SNS one. " +
+ "Please check your setup and make sure it is S3 notification event
coming from SNS")
+ case value => parse(value.extract[String])
+ }
+ }
+
+ private def extractS3Message(parsedBody: JValue): JValue = {
+ sourceOptions.messageWrapper match {
+ case sourceOptions.S3MessageWrapper.None => parsedBody
+ case sourceOptions.S3MessageWrapper.SNS => tryToParseSNS(parsedBody)
+ }
+ }
+
private def parseSqsMessages(messageList: Seq[Message]): Seq[(String, Long,
String)] = {
val errorMessages = scala.collection.mutable.ListBuffer[String]()
val parsedMessages = messageList.foldLeft(Seq[(String, Long, String)]()) {
(list, message) =>
implicit val formats = DefaultFormats
try {
val messageReceiptHandle = message.getReceiptHandle
- val messageJson = parse(message.getBody).extract[JValue]
+
+ val parsedBody: JValue = parse(message.getBody).extract[JValue]
+ val messageJson = extractS3Message(parsedBody)
+
val bucketName = (
messageJson \ "Records" \ "s3" \ "bucket" \
"name").extract[Array[String]].head
val eventName = (messageJson \ "Records" \
"eventName").extract[Array[String]].head
diff --git
a/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala
b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala
index a4c0cc1..0c2eda0 100644
---
a/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala
+++
b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala
@@ -28,6 +28,15 @@ import org.apache.spark.util.Utils
*/
class SqsSourceOptions(parameters: CaseInsensitiveMap[String]) extends Logging
{
+ object S3MessageWrapper extends Enumeration {
+ type MessageFormat = Value
+ val None, SNS = Value
+
+ def withNameOpt(opt: String): Option[Value] = {
+ values.find(_.toString.toLowerCase == opt.toLowerCase)
+ }
+ }
+
def this(parameters: Map[String, String]) =
this(CaseInsensitiveMap(parameters))
val maxFilesPerTrigger: Option[Int] =
parameters.get("maxFilesPerTrigger").map { str =>
@@ -92,6 +101,13 @@ class SqsSourceOptions(parameters:
CaseInsensitiveMap[String]) extends Logging {
throw new IllegalArgumentException("Specifying file format is mandatory
with sqs source")
}
+ val messageWrapper: S3MessageWrapper.Value =
parameters.get("messageWrapper").map( str =>
+ S3MessageWrapper.withNameOpt(str).getOrElse({
+ throw new IllegalArgumentException(s"Invalid value '$str' for option
'messageWrapper', " +
+ s"must be one of [${S3MessageWrapper.values.mkString(", ")}]")
+ })
+ ).getOrElse(S3MessageWrapper.None)
+
val ignoreFileDeletion: Boolean = withBooleanParameter("ignoreFileDeletion",
false)
/**
diff --git
a/sql-streaming-sqs/src/test/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptionsSuite.scala
b/sql-streaming-sqs/src/test/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptionsSuite.scala
index 6382fb1..e43bae3 100644
---
a/sql-streaming-sqs/src/test/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptionsSuite.scala
+++
b/sql-streaming-sqs/src/test/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptionsSuite.scala
@@ -74,6 +74,8 @@ class SqsSourceOptionsSuite extends StreamTest {
"for option 'shouldSortFiles', must be true or false")
testBadOptions("useInstanceProfileCredentials" -> "x")("Invalid value 'x'
" +
"for option 'useInstanceProfileCredentials', must be true or false")
+ testBadOptions("messageWrapper" -> "x")("Invalid value 'x' " +
+ "for option 'messageWrapper', must be one of [none, sns]")
}