[
https://issues.apache.org/jira/browse/BAHIR-110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16097921#comment-16097921
]
ASF GitHub Bot commented on BAHIR-110:
--------------------------------------
Github user emlaver commented on a diff in the pull request:
https://github.com/apache/bahir/pull/45#discussion_r128946219
--- Diff:
sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bahir.cloudant.internal
+
+import org.slf4j.{Logger, LoggerFactory}
+import play.api.libs.json.Json
+import scalaj.http._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
+
+import org.apache.bahir.cloudant.CloudantChangesConfig
+import org.apache.bahir.cloudant.common._
+
+
+class ChangesReceiver(config: CloudantChangesConfig)
+ extends Receiver[String](StorageLevel.MEMORY_AND_DISK) {
+
+ def onStart() {
+ // Start the thread that receives data over a connection
+ new Thread("Cloudant Receiver") {
+ override def run() { receive() }
+ }.start()
+ }
+
+ private def receive(): Unit = {
+ // Get total number of docs in database using _all_docs endpoint
+ val limit = new JsonStoreDataAccess(config)
+ .getTotalRows(config.getTotalUrl, queryUsed = false)
+
+ // Get continuous _changes url
+ val url = config.getChangesReceiverUrl.toString
+ val selector: String = {
+ "{\"selector\":" + config.getSelector + "}"
+ }
+
+ var count = 0
+ val clRequest: HttpRequest = config.username match {
+ case null =>
+ Http(url)
+ .postData(selector)
+ .timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
+ .header("Content-Type", "application/json")
+ .header("User-Agent", "spark-cloudant")
+ case _ =>
+ Http(url)
+ .postData(selector)
+ .timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
+ .header("Content-Type", "application/json")
+ .header("User-Agent", "spark-cloudant")
+ .auth(config.username, config.password)
+ }
+
+ clRequest.exec((code, headers, is) => {
+ if (code == 200) {
+ scala.io.Source.fromInputStream(is,
"utf-8").getLines().foreach(line => {
+ if (line.length() > 0) {
+ val json = Json.parse(line)
+ val jsonDoc = (json \ "doc").getOrElse(null)
+ var doc = ""
+ if(jsonDoc != null) {
+ doc = Json.stringify(jsonDoc)
+ // Verify that doc is not empty and is not deleted
+ val deleted = (jsonDoc \ "_deleted").getOrElse(null)
+ if(!doc.isEmpty && deleted == null) {
+ store(doc)
+ count += 1
+ }
+ }
+ } else if (count >= limit) {
--- End diff --
Fixed in 8b40e38.
> Implement _changes API for non-streaming receiver
> -------------------------------------------------
>
> Key: BAHIR-110
> URL: https://issues.apache.org/jira/browse/BAHIR-110
> Project: Bahir
> Issue Type: Improvement
> Reporter: Esteban Laver
> Original Estimate: 216h
> Remaining Estimate: 216h
>
> Today we use the _changes API for Spark streaming receiver and _all_docs API
> for non-streaming receiver. _all_docs API supports parallel reads (using
> offset and range) but performance of _changes API is still better in most
> cases (even with single threaded support).
> With this ticket we want to:
> a) implement _changes API for non-streaming receivers
> b) allow customers to pick either _all_docs (default) or _changes API
> endpoint, with documentation about pros and cons
> _changes performance details:
> Successfully loaded Cloudant (using local cloudant-developer docker image)
> docs into Spark (local standalone) with the following database sizes: 15GB
> (time: 8 1/2 mins), 20GB (17 mins), 46GB (25 mins), and 75GB (48 1/2 mins).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)