Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2695#discussion_r215973078
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
 ---
    @@ -82,24 +91,37 @@ case class CarbonCreateStreamCommand(
           sourceTable = sourceTable,
           sinkTable = CarbonEnv.getCarbonTable(sinkDbName, 
sinkTableName)(sparkSession),
           query = query,
    -      streamDf = Dataset.ofRows(sparkSession, streamLp),
    -      options = new StreamingOption(optionMap)
    +      streamDf = dataFrame.getOrElse(Dataset.ofRows(sparkSession, 
df.logicalPlan)),
    +      options = new StreamingOption(newMap.toMap)
         )
         Seq(Row(streamName, jobId, "RUNNING"))
       }
     
    -  private def prepareStreamingRelation(
    +  /**
    +   * Create a dataframe from source table of logicalRelation
    +   * @param sparkSession
    +   * @param logicalRelation
    +   * @return sourceTable and its stream dataFrame
    +   */
    +  private def prepareDataFrame(
           sparkSession: SparkSession,
    -      r: LogicalRelation): (CarbonTable, StreamingRelation) = {
    -    val sourceTable = 
r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +      logicalRelation: LogicalRelation): (CarbonTable, DataFrame) = {
    +    val sourceTable = logicalRelation.relation
    +      .asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
         val tblProperty = 
sourceTable.getTableInfo.getFactTable.getTableProperties
         val format = tblProperty.get("format")
         if (format == null) {
           throw new MalformedCarbonCommandException("Streaming from carbon 
file is not supported")
         }
    -    val streamReader = sparkSession.readStream
    -      .schema(getSparkSchema(sourceTable))
    -      .format(format)
    +    val streamReader = if (format != "kafka") {
    --- End diff --
    
    use 'equals' instead of '='


---

Reply via email to