Thank you very much for your response, I did ask a pro for help and below
was the sample code on sample SSB project I would like to contribute to
help someone who have same issue like me:
==============================
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.json4s.jackson.JsonMethods
import org.json4s.{DefaultFormats, Formats}
import java.io.{BufferedReader, DataOutputStream, InputStreamReader}
import java.net.{HttpURLConnection, URL}
import java.util.Base64
object APIKylinRunSQL {
val KYLIN_QUERY_URL = "http://localhost:7070/kylin/api/query"
val USER_NAME = "xxxxx"
val PASSWORD = "yyyyy"
val KYLIN_PROJECT = "learn_kylin"
val spark = SparkSession.builder
.master("local")
.appName("Convert JSON to DataFrame")
.getOrCreate()
def main(args: Array[String]): Unit = {
val tablesAndQueries = Map(
"CUSTOMER" -> "select * from SSB.CUSTOMER",
"DATES" -> "SELECT * FROM SSB.DATES",
"PART" -> "SELECT * FROM SSB.PART",
"P_LINEORDER" -> "SELECT * FROM SSB.P_LINEORDER",
"SUPPLIER" -> "SELECT * FROM SSB.SUPPLIER",
"P_LINEORDER" -> "SELECT lo_orderdate, count(1) FROM SSB.P_LINEORDER
GROUP BY lo_orderdate",
"PART" -> "SELECT P_COLOR, count(1) FROM SSB.PART group by P_COLOR"
)
// query times
val numberOfExecutions = 15
// loop query
for (i <- 1 to numberOfExecutions) {
println(s"Executing query $i")
for ((table, query) <- tablesAndQueries) {
println(s"Executing queries for table $table")
println(query)
executeQuery(query)
// wait a seconds
Thread.sleep(1000)
}
}
}
def executeQuery(sqlQuery: String): Unit = {
val queryJson =
s"""
|{
| "project": "$KYLIN_PROJECT",
| "sql": "$sqlQuery"
|}
|""".stripMargin
// Encode the username and password for basic authentication
val encodedAuth =
Base64.getEncoder.encodeToString(s"$USER_NAME:$PASSWORD".getBytes)
val url = new URL(KYLIN_QUERY_URL)
val connection = url.openConnection.asInstanceOf[HttpURLConnection]
connection.setRequestMethod("POST")
connection.setRequestProperty("Authorization", s"Basic $encodedAuth")
connection.setRequestProperty("Content-Type", "application/json")
connection.setRequestProperty("Accept", "application/json")
connection.setDoOutput(true)
val outputStream = connection.getOutputStream
val writer = new DataOutputStream(outputStream)
writer.write(queryJson.getBytes("UTF-8"))
writer.flush()
writer.close()
val responseCode = connection.getResponseCode
if (responseCode == HttpURLConnection.HTTP_OK) {
val inputStream = connection.getInputStream
val reader = new BufferedReader(new InputStreamReader(inputStream))
var inputLine: String = null
val response = new StringBuilder
while ( {
inputLine = reader.readLine;
inputLine != null
}) {
response.append(inputLine)
}
reader.close()
println("Result:")
println(response.toString)
connection.disconnect()
// parse JSON
implicit val formats: Formats = DefaultFormats
val parsedJson = JsonMethods.parse(response.toString)
val columns = (parsedJson \ "columnMetas")
.extract[List[Map[String, Any]]]
// dynamically build the schema based on column name information in
JSON
val schema = StructType(columns.map { col =>
val columnName = col("name").asInstanceOf[String]
StructField(columnName, StringType, nullable = true)
})
schema.printTreeString()
// get data from JSON
val data = (parsedJson \ "results").extract[List[List[Any]]]
// convert data to RDD[Row]
val rowsRDD = spark.sparkContext.parallelize(data.map(row =>
Row.fromSeq(row.map(_.asInstanceOf[AnyRef]))))
val df = spark.createDataFrame(rowsRDD, schema)
df.show(20, false)
} else {
println(s"Error: $responseCode")
connection.disconnect()
}
}
}
On Sun, Mar 31, 2024 at 8:57 PM Lionel CL <[email protected]> wrote:
> Hi Nam,
> You can refer to the spark docs
> https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
>
> Regards,
> Lu Cao
>
> From: Nam Đỗ Duy <[email protected]>
> Date: Sunday, March 31, 2024 at 08:53
> To: dev <[email protected]>, [email protected] <
> [email protected]>
> Subject: Re: How to query the Cube via API and use the dataset for other
> purpose
> Dear Sirs/Madames
>
> Could anyone here help me to figureout the way to use scala to query an
> select SQL against kylin cube via API then turn that table result into a
> dataframe in scala for other purpose?
>
> Thank you so much for your time!
>
> Best regards
>
> On Fri, 29 Mar 2024 at 17:52 Nam Đỗ Duy <[email protected]> wrote:
>
> > Hi Xiaoxiang,
> > Sir & Madames,
> >
> > I use the following code to query the cube via API but I cannot use the
> > result as a dataframe, could you suggest a way to do that because it is
> > very important for our project.
> >
> > Thanks and best regards
> >
> > ===================================
> >
> > import org.apache.spark.sql.{DataFrame, SparkSession}
> > import org.apache.spark.sql.functions._
> >
> > object APICaller {
> > def main(args: Array[String]): Unit = {
> > val spark = SparkSession.builder()
> > .appName("APICaller")
> > .master("local[*]")
> > .getOrCreate()
> >
> > import spark.implicits._
> >
> > val username = "namdd"
> > val password = "eer123"
> > val urlString = "http://localhost:7070/kylin/api/query"
> > val project = "learn_kylin"
> > val query = "select count(*) from HIVE_DWH_STANDARD.factuserEvent"
> >
> > val response: String = callAPI(urlString, username, password,
> project,
> > query)
> >
> > // Convert response to DataFrame
> > val df = spark.read.json(Seq(response).toDS())
> >
> > // Show DataFrame
> > df.show()
> >
> > // Stop Spark session
> > spark.stop()
> > }
> >
> > def callAPI(url: String, username: String, password: String, project:
> > String, query: String): String = {
> > val encodedAuth =
> >
> java.util.Base64.getEncoder.encodeToString(s"$username:$password".getBytes)
> >
> > val connection = scalaj.http.Http(url)
> > .postData(s"""{"project": "$project", "sql": "$query"}""")
> > .header("Content-Type", "application/json")
> > .header("Accept", "application/json")
> > .auth(username, password)
> > .asString
> >
> > if (connection.isError)
> > throw new RuntimeException(s"Error calling API:
> ${connection.body}")
> >
> > connection.body
> > }
> > }
> >
> >
>