Re: Writing Custom Spark Readers and Writers

2022-04-06 Thread daniel queiroz
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/connector/read/index.html
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/connector/write/index.html

https://developer.here.com/documentation/java-scala-dev/dev_guide/spark-connector/index.html

Grato,

Daniel Queiroz
81 996289671


Em qua., 6 de abr. de 2022 às 03:57, Dyanesh Varun <
dyaneshvarun...@gmail.com> escreveu:

> Hey team,
>
> Can you please share some documentation/blogs where we can get to know how
> we can write custom sources and sinks for both streaming and static
> datasets.
>
> Thanks in advance
> Dyanesh Varun
>
>


Re: Spark salesforce connector

2021-11-25 Thread daniel queiroz
 + domain +
"""","sub":"""" + userId + """","exp":"""" + expiretime + """"}"""
  val encodedpayload =
Base64.encodeBase64URLSafeString(payload.getBytes("utf-8"))
  val message = encodedHeader + "." + encodedpayload

  /*
  *  assertion
  */

  val grant_type = "urn:ietf:params:oauth:grant-type:jwt-bearer"
  val result = encriptParam(message, readPrivateKey(private_key))
  val assertion = message + "." + result


  /*
  *  access token
  */
  def retrieveAccessToken(): String = {

val tokenresponse = Http(tokenURL).timeout(connTimeoutMs = 2,
readTimeoutMs = 60)
  .param("grant_type", grant_type)
  .param("assertion", assertion)
  .header("Content-Type", "application/x-www-form-urlencoded")
  .header("Charset", "UTF-8").asString

val tokends = spark.read.json(Seq(tokenresponse.body).toDS())
val token = tokends.select("access_token").first().mkString("")

token
  }


  /*
  *  create query job
  */
  def createQueryJob(selectQuery: String, token: String): String = {

val createjobResp = Http(sourceURL).timeout(connTimeoutMs = 2,
readTimeoutMs = 60)
  .postData(s"""{"columnDelimiter": "PIPE", "operation": "query",
"query": "${selectQuery}"}""")
  .header("Authorization", "Bearer " + token)
  .header("Content-Type", "application/json")
  .header("Charset", "UTF-8").asString

val job = spark.read.json(Seq(createjobResp.body).toDS())
val jobId = job.select("id").first().mkString("")

jobId
  }


  /*
  *  returns true if query job is completed
  */
  def jobIsCompleted(jobId: String, token: String): Boolean = {

val jobStatus = Http(sourceURL + "/" + jobId).timeout(connTimeoutMs =
2, readTimeoutMs = 60)
  .header("Authorization", "Bearer " + token)
  .header("Content-Type", "application/json")
  .header("Charset", "UTF-8").asString

val statusresponseBody = spark.read.json(Seq(jobStatus.body).toDS())
val status = statusresponseBody.select("state").first().mkString("")

status match {
case "Completed" => {
  true
}
case "JobComplete" => {
  true
}
case "CompletedWithWarnings" => {
  true
}
case "Failed" => {
  false
}
case _ => {
  Thread.sleep(2000)
  jobIsCompleted(jobId, token)
}
 }
  }

  /*
  *  get query job status (recursively)
  */
  private def retryWithExponentialBackoff(
  func:() => Boolean,
  timeoutDuration: FiniteDuration,
  initSleepInterval: FiniteDuration,
  maxSleepInterval: FiniteDuration): Boolean = {

val timeout = timeoutDuration.toMillis
var waited = 0L
var sleepInterval = initSleepInterval.toMillis
var done = false

do {
  done = func()
  if (!done) {
sleepInterval = math.min(sleepInterval * 2,
maxSleepInterval.toMillis)

var sleepTime = math.min(sleepInterval, timeout - waited)
if (sleepTime < 1L) {
  sleepTime = 1
}
Thread.sleep(sleepTime)
waited += sleepTime
  }
} while (!done && waited < timeout)

done
  }

  /*
  *  wait until the job is completed
  */
  def awaitJobCompleted(jobId: String, token: String): Boolean = {

    val timeoutDuration = FiniteDuration(60L, MILLISECONDS)
val initSleepIntervalDuration = FiniteDuration(200L, MILLISECONDS)
val maxSleepIntervalDuration = FiniteDuration(1L, MILLISECONDS)
var completed = false

retryWithExponentialBackoff(() => {
  completed = jobIsCompleted(jobId, token)
  completed
}, timeoutDuration, initSleepIntervalDuration, maxSleepIntervalDuration)

return completed
  }


  /*
  *  retrieve query data
  */
  def retrieveQueryResults(jobId: String, token: String): DataFrame = {

val result = Http(sourceURL + "/"  + jobId +
"/results?maxRecords=50").timeout(connTimeoutMs = 2, readTimeoutMs
= 60)
  .header("Authorization", "Bearer " + token)
  .asString

val resultBody = result.body
val csvData: Dataset[String] =
spark.sparkContext.parallelize(resultBody.stripMargin.lines.toList).toDS()
val dsResult = spark.read.option("header",
true).option("inferSchema",true).option("sep", "|").csv(csvData)
dsResult
  }
}


Regards,

Daniel Queiroz
+55 81 996289671


Em qui., 25 de nov. de 2021 às 04:09, Atlas - Samir Souidi <
ssou...@atlasdataservices.com> escreveu:

> Dear all,
>
> Do you know if there is any spark connector to SalesForce?
>
> Thanks
>
> Sam
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>


Re: Optimizing SQL Query

2015-03-06 Thread daniel queiroz
Dude,

please, attach the execution plan of the query and details about the
indexes.



2015-03-06 9:07 GMT-03:00 anu anamika.guo...@gmail.com:

 I have a query that's like:

 Could you help in providing me pointers as to how to start to optimize it
 w.r.t. spark sql:


 sqlContext.sql(

 SELECT dw.DAY_OF_WEEK, dw.HOUR, avg(dw.SDP_USAGE) AS AVG_SDP_USAGE

 FROM (
SELECT sdp.WID, DAY_OF_WEEK, HOUR, SUM(INTERVAL_VALUE) AS
 SDP_USAGE

FROM (
  SELECT *

  FROM date_d dd JOIN interval_f intf

  ON intf.DATE_WID = dd.WID

  WHERE intf.DATE_WID = 20141116 AND
 intf.DATE_WID = 20141125 AND CAST(INTERVAL_END_TIME AS STRING) =
 '2014-11-16 00:00:00.000' AND  CAST(INTERVAL_END_TIME
AS STRING) = '2014-11-26
 00:00:00.000' AND MEAS_WID = 3

   ) test JOIN sdp_d sdp

ON test.SDP_WID = sdp.WID

WHERE sdp.UDC_ID = 'SP-1931201848'

GROUP BY sdp.WID, DAY_OF_WEEK, HOUR, sdp.UDC_ID

) dw

 GROUP BY dw.DAY_OF_WEEK, dw.HOUR)



 Currently the query takes 15 minutes execution time where interval_f table
 holds approx 170GB of raw data, date_d -- 170 MB and sdp_d -- 490MB



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Optimizing-SQL-Query-tp21948.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org