+ domain +
"""","sub":"""" + userId + """","exp":"""" + expiretime + """"}"""
val encodedpayload =
Base64.encodeBase64URLSafeString(payload.getBytes("utf-8"))
val message = encodedHeader + "." + encodedpayload
/*
* assertion
*/
val grant_type = "urn:ietf:params:oauth:grant-type:jwt-bearer"
val result = encriptParam(message, readPrivateKey(private_key))
val assertion = message + "." + result
/*
* access token
*/
def retrieveAccessToken(): String = {
val tokenresponse = Http(tokenURL).timeout(connTimeoutMs = 2,
readTimeoutMs = 60)
.param("grant_type", grant_type)
.param("assertion", assertion)
.header("Content-Type", "application/x-www-form-urlencoded")
.header("Charset", "UTF-8").asString
val tokends = spark.read.json(Seq(tokenresponse.body).toDS())
val token = tokends.select("access_token").first().mkString("")
token
}
/*
* create query job
*/
def createQueryJob(selectQuery: String, token: String): String = {
val createjobResp = Http(sourceURL).timeout(connTimeoutMs = 2,
readTimeoutMs = 60)
.postData(s"""{"columnDelimiter": "PIPE", "operation": "query",
"query": "${selectQuery}"}""")
.header("Authorization", "Bearer " + token)
.header("Content-Type", "application/json")
.header("Charset", "UTF-8").asString
val job = spark.read.json(Seq(createjobResp.body).toDS())
val jobId = job.select("id").first().mkString("")
jobId
}
/*
* returns true if query job is completed
*/
def jobIsCompleted(jobId: String, token: String): Boolean = {
val jobStatus = Http(sourceURL + "/" + jobId).timeout(connTimeoutMs =
2, readTimeoutMs = 60)
.header("Authorization", "Bearer " + token)
.header("Content-Type", "application/json")
.header("Charset", "UTF-8").asString
val statusresponseBody = spark.read.json(Seq(jobStatus.body).toDS())
val status = statusresponseBody.select("state").first().mkString("")
status match {
case "Completed" => {
true
}
case "JobComplete" => {
true
}
case "CompletedWithWarnings" => {
true
}
case "Failed" => {
false
}
case _ => {
Thread.sleep(2000)
jobIsCompleted(jobId, token)
}
}
}
/*
* get query job status (recursively)
*/
private def retryWithExponentialBackoff(
func:() => Boolean,
timeoutDuration: FiniteDuration,
initSleepInterval: FiniteDuration,
maxSleepInterval: FiniteDuration): Boolean = {
val timeout = timeoutDuration.toMillis
var waited = 0L
var sleepInterval = initSleepInterval.toMillis
var done = false
do {
done = func()
if (!done) {
sleepInterval = math.min(sleepInterval * 2,
maxSleepInterval.toMillis)
var sleepTime = math.min(sleepInterval, timeout - waited)
if (sleepTime < 1L) {
sleepTime = 1
}
Thread.sleep(sleepTime)
waited += sleepTime
}
} while (!done && waited < timeout)
done
}
/*
* wait until the job is completed
*/
def awaitJobCompleted(jobId: String, token: String): Boolean = {
val timeoutDuration = FiniteDuration(60L, MILLISECONDS)
val initSleepIntervalDuration = FiniteDuration(200L, MILLISECONDS)
val maxSleepIntervalDuration = FiniteDuration(1L, MILLISECONDS)
var completed = false
retryWithExponentialBackoff(() => {
completed = jobIsCompleted(jobId, token)
completed
}, timeoutDuration, initSleepIntervalDuration, maxSleepIntervalDuration)
return completed
}
/*
* retrieve query data
*/
def retrieveQueryResults(jobId: String, token: String): DataFrame = {
val result = Http(sourceURL + "/" + jobId +
"/results?maxRecords=50").timeout(connTimeoutMs = 2, readTimeoutMs
= 60)
.header("Authorization", "Bearer " + token)
.asString
val resultBody = result.body
val csvData: Dataset[String] =
spark.sparkContext.parallelize(resultBody.stripMargin.lines.toList).toDS()
val dsResult = spark.read.option("header",
true).option("inferSchema",true).option("sep", "|").csv(csvData)
dsResult
}
}
Regards,
Daniel Queiroz
+55 81 996289671
Em qui., 25 de nov. de 2021 às 04:09, Atlas - Samir Souidi <
ssou...@atlasdataservices.com> escreveu:
> Dear all,
>
> Do you know if there is any spark connector to SalesForce?
>
> Thanks
>
> Sam
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>