[CARBONDATA-2215][Documentation] Describe CarbonStreamParser in streaming-guide.md
This closes #2016 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/87361a80 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/87361a80 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/87361a80 Branch: refs/heads/branch-1.3 Commit: 87361a8069503a6a3fa6b31e54ed9849259c81c9 Parents: 28c3701 Author: Zhang Zhichao <441586...@qq.com> Authored: Wed Feb 28 23:07:38 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Sat Mar 3 17:47:42 2018 +0530 ---------------------------------------------------------------------- docs/streaming-guide.md | 74 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/87361a80/docs/streaming-guide.md ---------------------------------------------------------------------- diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md index 201f8e0..aa9eaef 100644 --- a/docs/streaming-guide.md +++ b/docs/streaming-guide.md @@ -152,6 +152,80 @@ property name | default | description --- | --- | --- carbon.streaming.auto.handoff.enabled | true | whether to auto trigger handoff operation +## Stream data parser +Config the property "carbon.stream.parser" to define a stream parser to convert InternalRow to Object[] when write stream data. + +property name | default | description +--- | --- | --- +carbon.stream.parser | org.apache.carbondata.streaming.parser.CSVStreamParserImp | the class of the stream parser + +Currently CarbonData support two parsers, as following: + +**1. org.apache.carbondata.streaming.parser.CSVStreamParserImp**: This is the default stream parser, it gets a line data(String type) from the first index of InternalRow and converts this String to Object[]. + +**2. org.apache.carbondata.streaming.parser.RowStreamParserImp**: This stream parser will auto convert InternalRow to Object[] according to schema of this `DataSet`, for example: + +```scala + case class FileElement(school: Array[String], age: Int) + case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement) + ... + + var qry: StreamingQuery = null + val readSocketDF = spark.readStream + .format("socket") + .option("host", "localhost") + .option("port", 9099) + .load() + .as[String] + .map(_.split(",")) + .map { fields => { + val tmp = fields(4).split("\\$") + val file = FileElement(tmp(0).split(":"), tmp(1).toInt) + StreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, file) + } } + + // Write data from socket stream to carbondata file + qry = readSocketDF.writeStream + .format("carbondata") + .trigger(ProcessingTime("5 seconds")) + .option("checkpointLocation", tablePath.getStreamingCheckpointDir) + .option("dbName", "default") + .option("tableName", "carbon_table") + .option(CarbonStreamParser.CARBON_STREAM_PARSER, + CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER) + .start() + + ... +``` + +### How to implement a customized stream parser +If user needs to implement a customized stream parser to convert a specific InternalRow to Object[], it needs to implement `initialize` method and `parserRow` method of interface `CarbonStreamParser`, for example: + +```scala + package org.XXX.XXX.streaming.parser + + import org.apache.hadoop.conf.Configuration + import org.apache.spark.sql.catalyst.InternalRow + import org.apache.spark.sql.types.StructType + + class XXXStreamParserImp extends CarbonStreamParser { + + override def initialize(configuration: Configuration, structType: StructType): Unit = { + // user can get the properties from "configuration" + } + + override def parserRow(value: InternalRow): Array[Object] = { + // convert InternalRow to Object[](Array[Object] in Scala) + } + + override def close(): Unit = { + } + } + +``` + +and then set the property "carbon.stream.parser" to "org.XXX.XXX.streaming.parser.XXXStreamParserImp". + ## Close streaming table Use below command to handoff all streaming segments to columnar format segments and modify the streaming property to false, this table becomes a normal table. ```sql