Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1867#discussion_r170856116 --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala --- @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink +import org.apache.spark.sql.execution.streaming.Sink +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.Time + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable + +class CarbonStreamSparkStreamingWriter { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + private var isInitialize: Boolean = false + + private var lock: ICarbonLock = null + private var carbonTable: CarbonTable = null + private var configuration: Configuration = null + private var carbonAppendableStreamSink: Sink = null + private val sparkSession: SparkSession = SparkSession.builder().getOrCreate() + + def this(carbonTable: CarbonTable, configuration: Configuration) { + this() + this.carbonTable = carbonTable + this.configuration = configuration + this.option("dbName", carbonTable.getDatabaseName) + this.option("tableName", carbonTable.getTableName) + } + + /** + * Acquired the lock for stream table + */ + def lockStreamTable(): Unit = { + lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, + LockUsage.STREAMING_LOCK) + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the lock for stream table: " + + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) + } else { + LOGGER.error("Not able to acquire the lock for stream table:" + + carbonTable.getDatabaseName + "." + carbonTable.getTableName) + throw new InterruptedException( + "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) + } + } + + /** + * unlock for stream table + */ + def unLockStreamTable(): Unit = { + if (null != lock) { + lock.unlock() + LOGGER.info("unlock for stream table: " + + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) + } + } + + def initialize(): Unit = { + carbonAppendableStreamSink = StreamSinkFactory.createStreamTableSink( + sparkSession, + configuration, + carbonTable, + extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink] + + lockStreamTable() + + isInitialize = true + } + + def writeStreamData(dataFrame: DataFrame, time: Time): Unit = { + if (!isInitialize) { + initialize() + } + carbonAppendableStreamSink.addBatch(time.milliseconds, dataFrame) + } + + private val extraOptions = new scala.collection.mutable.HashMap[String, String] + private var mode: SaveMode = SaveMode.ErrorIfExists + + /** + * Specifies the behavior when data or table already exists. Options include: + * - `SaveMode.Overwrite`: overwrite the existing data. + * - `SaveMode.Append`: append the data. + * - `SaveMode.Ignore`: ignore the operation (i.e. no-op). + * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. + */ + def mode(saveMode: SaveMode): CarbonStreamSparkStreamingWriter = { + if (mode == SaveMode.ErrorIfExists) { + mode = saveMode + } + this + } + + /** + * Specifies the behavior when data or table already exists. Options include: + * - `SaveMode.Overwrite`: overwrite the existing data. --- End diff -- saveMode parameter is String, change the description
---