Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1867#discussion_r170856116
  
    --- Diff: 
streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
 ---
    @@ -0,0 +1,187 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import java.util
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.sql.DataFrame
    +import org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink
    +import org.apache.spark.sql.execution.streaming.Sink
    +import org.apache.spark.sql.SaveMode
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.streaming.Time
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, 
LockUsage}
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +
    +class CarbonStreamSparkStreamingWriter {
    +
    +  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  private var isInitialize: Boolean = false
    +
    +  private var lock: ICarbonLock = null
    +  private var carbonTable: CarbonTable = null
    +  private var configuration: Configuration = null
    +  private var carbonAppendableStreamSink: Sink = null
    +  private val sparkSession: SparkSession = 
SparkSession.builder().getOrCreate()
    +
    +  def this(carbonTable: CarbonTable, configuration: Configuration) {
    +    this()
    +    this.carbonTable = carbonTable
    +    this.configuration = configuration
    +    this.option("dbName", carbonTable.getDatabaseName)
    +    this.option("tableName", carbonTable.getTableName)
    +  }
    +
    +  /**
    +   * Acquired the lock for stream table
    +   */
    +  def lockStreamTable(): Unit = {
    +    lock = 
CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
    +      LockUsage.STREAMING_LOCK)
    +    if (lock.lockWithRetries()) {
    +      LOGGER.info("Acquired the lock for stream table: " +
    +                  carbonTable.getDatabaseName + "." +
    +                  carbonTable.getTableName)
    +    } else {
    +      LOGGER.error("Not able to acquire the lock for stream table:" +
    +                   carbonTable.getDatabaseName + "." + 
carbonTable.getTableName)
    +      throw new InterruptedException(
    +        "Not able to acquire the lock for stream table: " + 
carbonTable.getDatabaseName + "." +
    +        carbonTable.getTableName)
    +    }
    +  }
    +
    +  /**
    +   * unlock for stream table
    +   */
    +  def unLockStreamTable(): Unit = {
    +    if (null != lock) {
    +      lock.unlock()
    +      LOGGER.info("unlock for stream table: " +
    +                  carbonTable.getDatabaseName + "." +
    +                  carbonTable.getTableName)
    +    }
    +  }
    +
    +  def initialize(): Unit = {
    +    carbonAppendableStreamSink = StreamSinkFactory.createStreamTableSink(
    +      sparkSession,
    +      configuration,
    +      carbonTable,
    +      extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink]
    +
    +    lockStreamTable()
    +
    +    isInitialize = true
    +  }
    +
    +  def writeStreamData(dataFrame: DataFrame, time: Time): Unit = {
    +    if (!isInitialize) {
    +      initialize()
    +    }
    +    carbonAppendableStreamSink.addBatch(time.milliseconds, dataFrame)
    +  }
    +
    +  private val extraOptions = new scala.collection.mutable.HashMap[String, 
String]
    +  private var mode: SaveMode = SaveMode.ErrorIfExists
    +
    +  /**
    +   * Specifies the behavior when data or table already exists. Options 
include:
    +   *   - `SaveMode.Overwrite`: overwrite the existing data.
    +   *   - `SaveMode.Append`: append the data.
    +   *   - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
    +   *   - `SaveMode.ErrorIfExists`: default option, throw an exception at 
runtime.
    +   */
    +  def mode(saveMode: SaveMode): CarbonStreamSparkStreamingWriter = {
    +    if (mode == SaveMode.ErrorIfExists) {
    +      mode = saveMode
    +    }
    +    this
    +  }
    +
    +  /**
    +   * Specifies the behavior when data or table already exists. Options 
include:
    +   *   - `SaveMode.Overwrite`: overwrite the existing data.
    --- End diff --
    
    saveMode parameter is String, change the description


---

Reply via email to