[
https://issues.apache.org/jira/browse/BAHIR-110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080442#comment-16080442
]
ASF GitHub Bot commented on BAHIR-110:
--------------------------------------
Github user ricellis commented on a diff in the pull request:
https://github.com/apache/bahir/pull/45#discussion_r126438628
--- Diff:
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---
@@ -98,29 +99,89 @@ class DefaultSource extends RelationProvider
val config: CloudantConfig =
JsonStoreConfigManager.getConfig(sqlContext, parameters)
- var allDocsDF: DataFrame = null
+ var dataFrame: DataFrame = null
val schema: StructType = {
if (inSchema != null) {
inSchema
- } else {
- val df = if (config.getSchemaSampleSize() ==
- JsonStoreConfigManager.ALL_DOCS_LIMIT &&
+ } else if (!config.isInstanceOf[CloudantChangesConfig]
+ || config.viewName != null || config.indexName != null) {
+ val df = if (config.getSchemaSampleSize ==
+ JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT &&
config.viewName == null
&& config.indexName == null) {
val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext,
config)
- allDocsDF = sqlContext.read.json(cloudantRDD)
- allDocsDF
+ dataFrame = sqlContext.read.json(cloudantRDD)
+ dataFrame
} else {
val dataAccess = new JsonStoreDataAccess(config)
val aRDD = sqlContext.sparkContext.parallelize(
- dataAccess.getMany(config.getSchemaSampleSize()))
+ dataAccess.getMany(config.getSchemaSampleSize))
sqlContext.read.json(aRDD)
}
df.schema
+ } else {
+ /* Create a streaming context to handle transforming docs in
+ * larger databases into Spark datasets
+ */
+ /* Allow the raw data and persisted RDDs to be accessible outside
+ * of the streaming context.
+ * See https://spark.apache.org/docs/latest/configuration.html
+ * for more details.
+ */
+ sqlContext.sparkSession.conf.set("spark.streaming.unpersist",
"false")
+
+ val ssc = new StreamingContext(sqlContext.sparkContext,
Seconds(10))
+ val streamingMap = {
+ val selector =
config.asInstanceOf[CloudantChangesConfig].getSelector
+ if (selector != null) {
+ Map(
+ "database" -> config.getDbname,
+ "selector" -> selector
+ )
+ } else {
+ Map(
+ "database" -> config.getDbname
+ )
+ }
+ }
+
+ val changes = ssc.receiverStream(
+ new CloudantReceiver(sqlContext.sparkContext.getConf,
streamingMap))
+ changes.persist(config.asInstanceOf[CloudantChangesConfig]
+ .getStorageLevelForStreaming)
+
+ // Global RDD that's created from union of all RDDs
+ var globalRDD = ssc.sparkContext.emptyRDD[String]
+
+ logger.info("Loading data from Cloudant using "
+ +
config.asInstanceOf[CloudantChangesConfig].getContinuousChangesUrl)
+
+ // Collect and union each RDD to convert all RDDs to a DataFrame
+ changes.foreachRDD((rdd: RDD[String]) => {
+ if (!rdd.isEmpty()) {
+ if (globalRDD != null) {
+ // Union RDDs in foreach loop
+ globalRDD = globalRDD.union(rdd)
+ } else {
+ globalRDD = rdd
+ }
+ } else {
+ // Convert final global RDD[String] to DataFrame
+ dataFrame = sqlContext.sparkSession.read.json(globalRDD)
+ ssc.stop(stopSparkContext = false, stopGracefully = false)
+ }
+ })
+
+ ssc.start
+ // run streaming until all docs from continuous feed are received
+ ssc.awaitTermination
+ // ssc.stop(stopSparkContext = false, stopGracefully = false)
--- End diff --
Commented out code?
> Replace use of _all_docs API with _changes API in all receivers
> ---------------------------------------------------------------
>
> Key: BAHIR-110
> URL: https://issues.apache.org/jira/browse/BAHIR-110
> Project: Bahir
> Issue Type: Improvement
> Reporter: Esteban Laver
> Original Estimate: 216h
> Remaining Estimate: 216h
>
> Today we use the _changes API for Spark streaming receiver and _all_docs API
> for non-streaming receiver. _all_docs API supports parallel reads (using
> offset and range) but performance of _changes API is still better in most
> cases (even with single threaded support).
> With this ticket we want to:
> a) re-implement all receivers using _changes API
> b) compare performance between the two implementations based on _changes and
> _all_docs
> Based on the results in b) we could decide to either
> - replace _all_docs implementation with _changes based implementation OR
> - allow customers to pick one (with a solid documentation about pros and
> cons)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)