Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2695#discussion_r215973078
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
---
@@ -82,24 +91,37 @@ case class CarbonCreateStreamCommand(
sourceTable = sourceTable,
sinkTable = CarbonEnv.getCarbonTable(sinkDbName,
sinkTableName)(sparkSession),
query = query,
- streamDf = Dataset.ofRows(sparkSession, streamLp),
- options = new StreamingOption(optionMap)
+ streamDf = dataFrame.getOrElse(Dataset.ofRows(sparkSession,
df.logicalPlan)),
+ options = new StreamingOption(newMap.toMap)
)
Seq(Row(streamName, jobId, "RUNNING"))
}
- private def prepareStreamingRelation(
+ /**
+ * Create a dataframe from source table of logicalRelation
+ * @param sparkSession
+ * @param logicalRelation
+ * @return sourceTable and its stream dataFrame
+ */
+ private def prepareDataFrame(
sparkSession: SparkSession,
- r: LogicalRelation): (CarbonTable, StreamingRelation) = {
- val sourceTable =
r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+ logicalRelation: LogicalRelation): (CarbonTable, DataFrame) = {
+ val sourceTable = logicalRelation.relation
+ .asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
val tblProperty =
sourceTable.getTableInfo.getFactTable.getTableProperties
val format = tblProperty.get("format")
if (format == null) {
throw new MalformedCarbonCommandException("Streaming from carbon
file is not supported")
}
- val streamReader = sparkSession.readStream
- .schema(getSparkSchema(sourceTable))
- .format(format)
+ val streamReader = if (format != "kafka") {
--- End diff --
use 'equals' instead of '='
---