Repository: carbondata
Updated Branches:
  refs/heads/carbonstore 239a6cadb -> 9ac55a5a6


[CARBONDATA-2736][CARBONSTORE] Kafka integration with Carbon StreamSQL

Modification in this PR:
1.Pass source table properties to streamReader.load()
2.Do not pass schema when sparkSession.readStream
3.Remove querySchema validation against sink as dataFrame made from kafka 
source will not have schema ( its written in value column of schema )
4.Extract the dataframe from kafka source which contain actual data schema @ 
writeStream

This closes #2495


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9ac55a5a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9ac55a5a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9ac55a5a

Branch: refs/heads/carbonstore
Commit: 9ac55a5a656ebe106697ca76a04916bea2ef3109
Parents: 239a6ca
Author: Ajith <ajith2...@gmail.com>
Authored: Thu Jul 12 09:17:22 2018 +0530
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Wed Jul 18 16:40:35 2018 +0800

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java |  9 ++++
 .../command/carbonTableSchemaCommon.scala       |  3 --
 .../carbondata/stream/StreamJobManager.scala    | 32 +++++++++----
 .../stream/CarbonCreateStreamCommand.scala      | 48 ++++++++++++--------
 4 files changed, 59 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ac55a5a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 995f943..8c54c07 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1176,6 +1176,15 @@ public class CarbonTable implements Serializable {
   }
 
   /**
+   * Return the format value defined in table properties
+   * @return String as per table properties, null if not defined
+   */
+  public String getFormat() {
+    return getTableInfo().getFactTable().getTableProperties()
+            .get("format");
+  }
+
+  /**
    * Method to get the list of cached columns of the table.
    * This method need to be used for Describe formatted like scenario where 
columns need to be
    * displayed in the column create order

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ac55a5a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 4f251e1..486c2fb 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -859,9 +859,6 @@ class TableNewProcessor(cm: TableModel) {
     tableInfo.setFactTable(tableSchema)
     val format = cm.tableProperties.get(CarbonCommonConstants.FORMAT)
     if (format.isDefined) {
-      if (!format.get.equalsIgnoreCase("csv")) {
-        CarbonException.analysisException(s"Currently we only support csv as 
external file format")
-      }
       tableInfo.setFormat(format.get)
       val formatProperties = cm.tableProperties.filter(pair =>
         pair._1.startsWith(s"${format.get.toLowerCase}.")).asJava

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ac55a5a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
index 59e924d..470d89a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
@@ -52,19 +52,23 @@ object StreamJobManager {
     }
   }
 
-  private def validateSinkTable(querySchema: StructType, sink: CarbonTable): 
Unit = {
+  private def validateSinkTable(validateQuerySchema: Boolean,
+                                querySchema: StructType, sink: CarbonTable): 
Unit = {
     if (!sink.isStreamingSink) {
       throw new MalformedCarbonCommandException(s"Table ${sink.getTableName} 
is not " +
                                                 "streaming sink table " +
                                                 "('streaming' tblproperty is 
not 'sink' or 'true')")
     }
-    val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { 
column =>
-      StructField(column.getColName,
-        CarbonScalaUtil.convertCarbonToSparkDataType(column.getDataType))
-    }
-    if (!querySchema.equals(StructType(fields))) {
-      throw new MalformedCarbonCommandException(s"Schema of table 
${sink.getTableName} " +
-                                                s"does not match query output")
+    // TODO: validate query schema against sink in kafka (we cannot get schema 
directly)
+    if (validateQuerySchema) {
+      val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { 
column =>
+        StructField(column.getColName,
+          CarbonScalaUtil.convertCarbonToSparkDataType(column.getDataType))
+      }
+      if (!querySchema.equals(StructType(fields))) {
+        throw new MalformedCarbonCommandException(s"Schema of table 
${sink.getTableName} " +
+          s"does not match query output")
+      }
     }
   }
 
@@ -102,14 +106,22 @@ object StreamJobManager {
     }
 
     validateSourceTable(sourceTable)
-    validateSinkTable(streamDf.schema, sinkTable)
+
+    // kafka source always have fixed schema, need to get actual schema
+    val isKafka = Option(sourceTable.getFormat).exists(_ == "kafka")
+    val dataFrame = if (isKafka) {
+      streamDf.selectExpr("CAST(value as STRING)")
+    } else {
+      streamDf
+    }
+    validateSinkTable(!isKafka, dataFrame.schema, sinkTable)
 
     // start a new thread to run the streaming ingest job, the job will be 
running
     // until user stops it by STOP STREAM JOB
     val thread = new Thread(new Runnable {
       override def run(): Unit = {
         try {
-          job = streamDf.writeStream
+          job = dataFrame.writeStream
             .format("carbondata")
             .trigger(options.trigger)
             .option("checkpointLocation", 
options.checkpointLocation(sinkTable.getTablePath))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ac55a5a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
index d3b178c..c413a62 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
@@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.DataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.streaming.StreamingRelation
 import org.apache.spark.sql.types.{StringType, StructType}
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -53,20 +52,21 @@ case class CarbonCreateStreamCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val df = sparkSession.sql(query)
     var sourceTable: CarbonTable = null
+    var dataFrame: Option[DataFrame] = None
 
-    // find the streaming source table in the query
-    // and replace it with StreamingRelation
-    val streamLp = df.logicalPlan transform {
+    // Prepare the dataframe from the stream source table
+    df.logicalPlan transform {
       case r: LogicalRelation
         if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            
r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource
 =>
-        val (source, streamingRelation) = 
prepareStreamingRelation(sparkSession, r)
+        val (source, resolvedFrame) = prepareDataFrame(sparkSession, r)
         if (sourceTable != null && sourceTable.getTableName != 
source.getTableName) {
           throw new MalformedCarbonCommandException(
             "Stream query on more than one stream source table is not 
supported")
         }
         sourceTable = source
-        streamingRelation
+        dataFrame = Option(resolvedFrame)
+        r
       case plan: LogicalPlan => plan
     }
 
@@ -82,24 +82,37 @@ case class CarbonCreateStreamCommand(
       sourceTable = sourceTable,
       sinkTable = CarbonEnv.getCarbonTable(sinkDbName, 
sinkTableName)(sparkSession),
       query = query,
-      streamDf = Dataset.ofRows(sparkSession, streamLp),
+      streamDf = dataFrame.getOrElse(Dataset.ofRows(sparkSession, 
df.logicalPlan)),
       options = new StreamingOption(optionMap)
     )
     Seq(Row(streamName, jobId, "RUNNING"))
   }
 
-  private def prepareStreamingRelation(
+  /**
+   * Create a dataframe from source table of logicalRelation
+   * @param sparkSession
+   * @param logicalRelation
+   * @return sourceTable and its stream dataFrame
+   */
+  private def prepareDataFrame(
       sparkSession: SparkSession,
-      r: LogicalRelation): (CarbonTable, StreamingRelation) = {
-    val sourceTable = 
r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+      logicalRelation: LogicalRelation): (CarbonTable, DataFrame) = {
+    val sourceTable = logicalRelation.relation
+      .asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
     val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties
     val format = tblProperty.get("format")
     if (format == null) {
       throw new MalformedCarbonCommandException("Streaming from carbon file is 
not supported")
     }
-    val streamReader = sparkSession.readStream
-      .schema(getSparkSchema(sourceTable))
-      .format(format)
+    val streamReader = if (format != "kafka") {
+      sparkSession.readStream
+        .schema(getSparkSchema(sourceTable))
+        .format(format)
+    } else {
+      // kafka source fixed schema, it cannot be set to a custom schema
+      sparkSession.readStream
+        .format(format)
+    }
     val dataFrame = format match {
       case "csv" | "text" | "json" | "parquet" =>
         if (!tblProperty.containsKey("path")) {
@@ -108,16 +121,11 @@ case class CarbonCreateStreamCommand(
         }
         streamReader.load(tblProperty.get("path"))
       case "kafka" | "socket" =>
-        streamReader.load()
+        streamReader.options(tblProperty).load()
       case other =>
         throw new MalformedCarbonCommandException(s"Streaming from $format is 
not supported")
     }
-    val streamRelation = dataFrame.logicalPlan.asInstanceOf[StreamingRelation]
-
-    // Since SparkSQL analyzer will match the UUID in attribute,
-    // create a new StreamRelation and re-use the same attribute from 
LogicalRelation
-    (sourceTable,
-      StreamingRelation(streamRelation.dataSource, streamRelation.sourceName, 
r.output))
+    (sourceTable, dataFrame)
   }
 
   private def getSparkSchema(sourceTable: CarbonTable): StructType = {

Reply via email to