[
https://issues.apache.org/jira/browse/BAHIR-97?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957651#comment-15957651
]
ASF GitHub Bot commented on BAHIR-97:
-------------------------------------
Github user lresende commented on a diff in the pull request:
https://github.com/apache/bahir/pull/38#discussion_r110021902
--- Diff:
sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.akka
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SharedSparkContext, SparkFunSuite}
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
+import org.apache.spark.sql.execution.streaming.LongOffset
+
+import org.apache.bahir.utils.BahirUtils
+
+class AkkaStreamSourceSuite extends SparkFunSuite with SharedSparkContext
with BeforeAndAfter {
+
+ protected var akkaTestUtils: AkkaTestUtils = _
+ protected val tempDir: File =
+ new File(System.getProperty("java.io.tmpdir") +
"/spark-akka-persistence")
+
+ akkaTestUtils = new AkkaTestUtils
+ akkaTestUtils.setup()
+
+ before {
+ tempDir.mkdirs()
+ }
+
+ after {
+ Persistence.close()
+ BahirUtils.recursiveDeleteDir(tempDir)
+ }
+
+ protected val tmpDir: String = tempDir.getAbsolutePath
+
+ protected def createStreamingDataframe(dir: String = tmpDir):
(SQLContext, DataFrame) = {
+
+ val sqlContext: SQLContext = new SQLContext(sc)
+
+ sqlContext.setConf("spark.sql.streaming.checkpointLocation", dir +
"/checkpoint")
+
+ val dataFrame: DataFrame =
+
sqlContext.readStream.format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+ .option("urlOfPublisher", akkaTestUtils.getFeederActorUri())
+ .option("persistenceDirPath", dir + "/persistence").load()
+ (sqlContext, dataFrame)
+ }
+}
+
+class BasicAkkaSourceSuite extends AkkaStreamSourceSuite {
+
+ private def writeStreamResults(sqlContext: SQLContext, dataFrame:
DataFrame,
+ waitDuration: Long): Boolean = {
+ import sqlContext.implicits._
+ dataFrame.as[(String, Timestamp)].writeStream.format("parquet")
+ .start(s"$tmpDir/parquet/t.parquet").awaitTermination(waitDuration)
+ }
+
+ private def readBackSreamingResults(sqlContext: SQLContext):
mutable.Buffer[String] = {
+ import sqlContext.implicits._
+ val asList =
+ sqlContext.read.schema(AkkaStreamConstants.SCHEMA_DEFAULT)
+ .parquet(s"$tmpDir/parquet/t.parquet").as[(String,
Timestamp)].map(_._1)
+ .collectAsList().asScala
+ asList
+ }
+
+ test("basic usage") {
+ val message = "Akka is a reactive framework"
+
+ akkaTestUtils.setMessage(message)
+ akkaTestUtils.setCountOfMessages(1)
+
+ val (sqlContext: SQLContext, dataFrame: DataFrame) =
createStreamingDataframe()
+
+ writeStreamResults(sqlContext, dataFrame, 10000)
+
+ val resultBuffer: mutable.Buffer[String] =
readBackSreamingResults(sqlContext)
+
+ assert(resultBuffer.size === 1)
+ assert(resultBuffer.head === message)
+ }
+
+ test("Send and receive 100 messages.") {
+ val message = "Akka is a reactive framework"
+
+ akkaTestUtils.setMessage(message)
+ akkaTestUtils.setCountOfMessages(100)
+
+ val (sqlContext: SQLContext, dataFrame: DataFrame) =
createStreamingDataframe()
+
+ writeStreamResults(sqlContext, dataFrame, 10000)
+
+ val resultBuffer: mutable.Buffer[String] =
readBackSreamingResults(sqlContext)
+
+ assert(resultBuffer.size === 100)
+ assert(resultBuffer.head === message)
+ }
+
+ test("params not provided") {
+ val persistenceDirPath = tempDir.getAbsolutePath + "/persistence"
+
+ val provider = new AkkaStreamSourceProvider
+ val sqlContext: SQLContext = new SQLContext(sc)
+
--- End diff --
Would you please change to use Sparksession instead of directly
instantiating a SQLContext.
> Akka as a streaming source for SQL Streaming.
> ---------------------------------------------
>
> Key: BAHIR-97
> URL: https://issues.apache.org/jira/browse/BAHIR-97
> Project: Bahir
> Issue Type: New Feature
> Components: Spark SQL Data Sources
> Affects Versions: Spark-2.1.0
> Reporter: Subhobrata Dey
>
> Hello,
> This issue is created to propose the addition of Akka compatible streaming
> source for Spark SQL Streaming.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)