[
https://issues.apache.org/jira/browse/BAHIR-110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16087380#comment-16087380
]
ASF GitHub Bot commented on BAHIR-110:
--------------------------------------
Github user mayya-sharipova commented on a diff in the pull request:
https://github.com/apache/bahir/pull/45#discussion_r127467315
--- Diff:
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---
@@ -98,29 +99,81 @@ class DefaultSource extends RelationProvider
val config: CloudantConfig =
JsonStoreConfigManager.getConfig(sqlContext, parameters)
- var allDocsDF: DataFrame = null
+ var dataFrame: DataFrame = null
val schema: StructType = {
if (inSchema != null) {
inSchema
- } else {
- val df = if (config.getSchemaSampleSize() ==
- JsonStoreConfigManager.ALL_DOCS_LIMIT &&
+ } else if (!config.isInstanceOf[CloudantChangesConfig]
+ || config.viewName != null || config.indexName != null) {
+ val df = if (config.getSchemaSampleSize ==
+ JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT &&
config.viewName == null
&& config.indexName == null) {
val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext,
config)
- allDocsDF = sqlContext.read.json(cloudantRDD)
- allDocsDF
+ dataFrame = sqlContext.read.json(cloudantRDD)
+ dataFrame
} else {
val dataAccess = new JsonStoreDataAccess(config)
val aRDD = sqlContext.sparkContext.parallelize(
- dataAccess.getMany(config.getSchemaSampleSize()))
+ dataAccess.getMany(config.getSchemaSampleSize))
sqlContext.read.json(aRDD)
}
df.schema
+ } else {
+ /* Create a streaming context to handle transforming docs in
+ * larger databases into Spark datasets
+ */
+ val ssc = new StreamingContext(sqlContext.sparkContext,
Seconds(10))
+ val streamingMap = {
+ val selector =
config.asInstanceOf[CloudantChangesConfig].getSelector
+ if (selector != null) {
+ Map(
+ "database" -> config.getDbname,
+ "selector" -> selector
+ )
+ } else {
+ Map(
+ "database" -> config.getDbname
+ )
+ }
+ }
+
+ val changes = ssc.receiverStream(
+ new CloudantReceiver(sqlContext.sparkContext.getConf,
streamingMap))
+ changes.persist(config.asInstanceOf[CloudantChangesConfig]
+ .getStorageLevelForStreaming)
+
+ // Global RDD that's created from union of all RDDs
+ var globalRDD = ssc.sparkContext.emptyRDD[String]
+
+ logger.info("Loading data from Cloudant using "
+ +
config.asInstanceOf[CloudantChangesConfig].getContinuousChangesUrl)
+
--- End diff --
@emlaver Here while trying to load a db, I am getting a message "Loading
data from Cloudant using
https://XXXX.cloudant.com/n_airportcodemapping/_changes?include_docs=true&feed=continuous&heartbeat=3000"
We should NOT load data into Spark SQL using `continuous` feed (which for
constantly updating database may be never be over). The whole point of loading
a db into Spark SQL is to load a snapshot of a db at a particular point of
time. Use `normal` feed here.
> Replace use of _all_docs API with _changes API in all receivers
> ---------------------------------------------------------------
>
> Key: BAHIR-110
> URL: https://issues.apache.org/jira/browse/BAHIR-110
> Project: Bahir
> Issue Type: Improvement
> Reporter: Esteban Laver
> Original Estimate: 216h
> Remaining Estimate: 216h
>
> Today we use the _changes API for Spark streaming receiver and _all_docs API
> for non-streaming receiver. _all_docs API supports parallel reads (using
> offset and range) but performance of _changes API is still better in most
> cases (even with single threaded support).
> With this ticket we want to:
> a) re-implement all receivers using _changes API
> b) compare performance between the two implementations based on _changes and
> _all_docs
> Based on the results in b) we could decide to either
> - replace _all_docs implementation with _changes based implementation OR
> - allow customers to pick one (with a solid documentation about pros and
> cons)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)