http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala index a7d831d..f426d65 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala @@ -17,11 +17,42 @@ package org.apache.spark.sql.execution.command -import org.apache.carbondata.common.logging.LogServiceFactory +import java.io.IOException +import java.text.SimpleDateFormat +import java.util + +import org.apache.carbondata.common.iudprocessor.iuddata.RowCountDetailsVO +import org.apache.carbondata.core.datastorage.store.impl.FileFactory +import org.apache.carbondata.core.update.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum} +import org.apache.carbondata.core.updatestatus.{SegmentStatusManager, SegmentUpdateStatusManager} +import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl +import org.apache.carbondata.processing.exception.MultipleMatchingException +import org.apache.carbondata.spark.util.QueryPlanUtil +import org.apache.spark.sql.catalyst.TableIdentifier +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan} import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.FileUtils + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier +import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.carbon.path.{CarbonTablePath, CarbonStorePath} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} +import org.apache.carbondata.spark.load.{CarbonLoaderUtil, FailureCauses} +import org.apache.carbondata.spark.merger.CarbonDataMergerUtil._ +import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CompactionType, CarbonDataMergerUtilResult} +import org.apache.carbondata.spark.DeleteDelataResultImpl + /** * IUD update delete and compaction framework. @@ -30,14 +61,73 @@ import org.apache.spark.sql.execution.RunnableCommand private[sql] case class ProjectForDeleteCommand( plan: LogicalPlan, - tableIdentifier: Seq[String], + identifier: Seq[String], timestamp: String) extends RunnableCommand { val LOG = LogServiceFactory.getLogService(this.getClass.getName) var horizontalCompactionFailed = false override def run(sqlContext: SQLContext): Seq[Row] = { - DataFrame(sqlContext, plan).show(truncate = false) + + val dataFrame = DataFrame(sqlContext, plan) + val dataRdd = dataFrame.rdd + + val relation = CarbonEnv.get.carbonMetastore + .lookupRelation1(deleteExecution.getTableIdentifier(identifier))(sqlContext). + asInstanceOf[CarbonRelation] + val carbonTable = relation.tableMeta.carbonTable + val metadataLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + LockUsage.METADATA_LOCK) + var lockStatus = false + try { + lockStatus = metadataLock.lockWithRetries() + LOG.audit(s" Delete data request has been received " + + s"for ${ relation.databaseName }.${ relation.tableName }.") + if (lockStatus) { + LOG.info("Successfully able to get the table metadata file lock") + } + else { + throw new Exception("Table is locked for deletion. Please try after some time") + } + val tablePath = CarbonStorePath.getCarbonTablePath( + carbonTable.getStorePath, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier) + var executorErrors = new ExecutionErrors(FailureCauses.NONE, "") + + // handle the clean up of IUD. + CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) + + if (deleteExecution + .deleteDeltaExecution(identifier, sqlContext, dataRdd, timestamp, relation, + false, executorErrors)) { + // call IUD Compaction. + IUDCommon.tryHorizontalCompaction(sqlContext, relation, isUpdateOperation = false) + } + } catch { + case e: HorizontalCompactionException => + LOG.error("Delete operation passed. Exception in Horizontal Compaction." + + " Please check logs. " + e.getMessage) + CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) + + case e: Exception => + LOG.error("Exception in Delete data operation " + e.getMessage) + // ****** start clean up. + // In case of failure , clean all related delete delta files + CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) + + // clean up. Null check is required as for executor error some times message is null + if (null != e.getMessage) { + sys.error("Delete data operation is failed. " + e.getMessage) + } + else { + sys.error("Delete data operation is failed. Please check logs.") + } + } finally { + if (lockStatus) { + CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK) + } + } Seq.empty } } @@ -47,9 +137,721 @@ private[sql] case class ProjectForUpdateCommand( val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName) override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution - .EXECUTION_ID_KEY, null) - DataFrame(sqlContext, plan).show(truncate = false) + + val res = plan find { + case relation: LogicalRelation if (relation.relation + .isInstanceOf[CarbonDatasourceRelation]) => + true + case _ => false + } + + if (!res.isDefined) { + return Seq.empty + } + + val relation = CarbonEnv.get.carbonMetastore + .lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext). + asInstanceOf[CarbonRelation] + val carbonTable = relation.tableMeta.carbonTable + val metadataLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + LockUsage.METADATA_LOCK) + var lockStatus = false + // get the current time stamp which should be same for delete and update. + val currentTime = CarbonUpdateUtil.readCurrentTime + var dataFrame: DataFrame = null + val isPersistEnabledUserValue = CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.isPersistEnabled, + CarbonCommonConstants.defaultValueIsPersistEnabled) + var isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled.toBoolean + if (isPersistEnabledUserValue.equalsIgnoreCase("false")) { + isPersistEnabled = false + } + else if (isPersistEnabledUserValue.equalsIgnoreCase("true")) { + isPersistEnabled = true + } + try { + lockStatus = metadataLock.lockWithRetries() + if (lockStatus) { + logInfo("Successfully able to get the table metadata file lock") + } + else { + throw new Exception("Table is locked for updation. Please try after some time") + } + val tablePath = CarbonStorePath.getCarbonTablePath( + carbonTable.getStorePath, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier) + // Get RDD. + dataFrame = if (isPersistEnabled) { + DataFrame(sqlContext, plan) + .persist(StorageLevel.MEMORY_AND_DISK) + } + else { + DataFrame(sqlContext, plan) + } + var executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + + + // handle the clean up of IUD. + CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) + + // do delete operation. + deleteExecution.deleteDeltaExecution(tableIdentifier, sqlContext, dataFrame.rdd, + currentTime + "", + relation, isUpdateOperation = true, executionErrors) + + if(executionErrors.failureCauses != FailureCauses.NONE) { + throw new Exception(executionErrors.errorMsg) + } + + // do update operation. + UpdateExecution.performUpdate(dataFrame, tableIdentifier, plan, + sqlContext, currentTime, executionErrors) + + if(executionErrors.failureCauses != FailureCauses.NONE) { + throw new Exception(executionErrors.errorMsg) + } + + // Do IUD Compaction. + IUDCommon.tryHorizontalCompaction(sqlContext, relation, isUpdateOperation = true) + } + + catch { + case e: HorizontalCompactionException => + LOGGER.error( + "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e) + // In case of failure , clean all related delta files + CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) + + case e: Exception => + LOGGER.error("Exception in update operation" + e) + // ****** start clean up. + // In case of failure , clean all related delete delta files + CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "") + + // *****end clean up. + if (null != e.getMessage) { + sys.error("Update operation failed. " + e.getMessage) + } + if (null != e.getCause && null != e.getCause.getMessage) { + sys.error("Update operation failed. " + e.getCause.getMessage) + } + sys.error("Update operation failed. please check logs.") + } + finally { + if (null != dataFrame && isPersistEnabled) { + dataFrame.unpersist() + } + if (lockStatus) { + CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK) + } + } + Seq.empty + } +} + +object IUDCommon { + + val LOG = LogServiceFactory.getLogService(this.getClass.getName) + + /** + * The method does horizontal compaction. After Update and Delete completion + * tryHorizontal compaction will be called. In case this method is called after + * Update statement then Update Compaction followed by Delete Compaction will be + * processed whereas for tryHorizontalCompaction called after Delete statement + * then only Delete Compaction will be processed. + * + * @param sqlContext + * @param carbonRelation + * @param isUpdateOperation + */ + def tryHorizontalCompaction(sqlContext: SQLContext, + carbonRelation: CarbonRelation, + isUpdateOperation: Boolean): Unit = { + + var ishorizontalCompaction = isHorizontalCompactionEnabled() + + if (ishorizontalCompaction == false) { + return + } + + var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION + val carbonTable = carbonRelation.tableMeta.carbonTable + val (db, table) = (carbonTable.getDatabaseName, carbonTable.getFactTableName) + val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier + val updateTimeStamp = System.currentTimeMillis() + // To make sure that update and delete timestamps are not same, + // required to commit to status metadata and cleanup + val deleteTimeStamp = updateTimeStamp + 1 + + // get the valid segments + var segLists = getValidSegmentList(absTableIdentifier) + + if (segLists == null || segLists.size() == 0) { + return + } + + // Should avoid reading Table Status file from Disk every time. Better to load it + // in-memory at the starting and pass it along the routines. The constructor of + // SegmentUpdateStatusManager reads the Table Status File and Table Update Status + // file and save the content in segmentDetails and updateDetails respectively. + val segmentUpdateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager( + absTableIdentifier) + + if (isUpdateOperation == true) { + + // This is only update operation, perform only update compaction. + compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION + performUpdateDeltaCompaction(sqlContext, + compactionTypeIUD, + carbonTable, + absTableIdentifier, + segmentUpdateStatusManager, + updateTimeStamp, + segLists) + } + + // After Update Compaction perform delete compaction + compactionTypeIUD = CompactionType.IUD_DELETE_DELTA_COMPACTION + segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier) + if (segLists == null || segLists.size() == 0) { + return + } + + // Delete Compaction + performDeleteDeltaCompaction(sqlContext, + compactionTypeIUD, + carbonTable, + absTableIdentifier, + segmentUpdateStatusManager, + deleteTimeStamp, + segLists) + } + + /** + * Update Delta Horizontal Compaction. + * + * @param sqlContext + * @param compactionTypeIUD + * @param carbonTable + * @param absTableIdentifier + * @param segLists + */ + private def performUpdateDeltaCompaction(sqlContext: SQLContext, + compactionTypeIUD: CompactionType, + carbonTable: CarbonTable, + absTableIdentifier: AbsoluteTableIdentifier, + segmentUpdateStatusManager: SegmentUpdateStatusManager, + factTimeStamp: Long, + segLists: util.List[String]): Unit = { + val db = carbonTable.getDatabaseName + val table = carbonTable.getFactTableName + // get the valid segments qualified for update compaction. + val validSegList = getSegListIUDCompactionQualified(segLists, + absTableIdentifier, + segmentUpdateStatusManager, + compactionTypeIUD) + + if (validSegList.size() == 0) { + return + } + + LOG.info(s"Horizontal Update Compaction operation started for [${db}.${table}].") + LOG.audit(s"Horizontal Update Compaction operation started for [${db}.${table}].") + + + if (compactionTypeIUD == CompactionType.IUD_FACTFILE_COMPACTION || + compactionTypeIUD == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) { + + try { + // Update Compaction. + if (compactionTypeIUD == CompactionType.IUD_FACTFILE_COMPACTION) { + val altertablemodel = AlterTableModel(Option(carbonTable.getDatabaseName), + carbonTable.getFactTableName, + Some(segmentUpdateStatusManager), + CompactionType.IUD_FACTFILE_COMPACTION.toString, + Some(factTimeStamp), + "") + + AlterTableCompaction(altertablemodel).run(sqlContext) + return + } + else { + val altertablemodel = AlterTableModel(Option(carbonTable.getDatabaseName), + carbonTable.getFactTableName, + Some(segmentUpdateStatusManager), + CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString, + Some(factTimeStamp), + "") + + AlterTableCompaction(altertablemodel).run(sqlContext) + } + } + catch { + case e: Exception => + val msg = if (null != e.getMessage) { + e.getMessage + } else { + "Please check logs for more info" + } + throw new HorizontalCompactionException( + s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp) + } + } + LOG.info(s"Horizontal Update Compaction operation completed for [${db}.${table}].") + LOG.audit(s"Horizontal Update Compaction operation completed for [${db}.${table}].") + } + + /** + * Delete Delta Horizontal Compaction. + * + * @param sqlContext + * @param compactionTypeIUD + * @param carbonTable + * @param absTableIdentifier + * @param segLists + */ + private def performDeleteDeltaCompaction(sqlContext: SQLContext, + compactionTypeIUD: CompactionType, + carbonTable: CarbonTable, + absTableIdentifier: AbsoluteTableIdentifier, + segmentUpdateStatusManager: SegmentUpdateStatusManager, + factTimeStamp: Long, + segLists: util.List[String]): Unit = { + + val db = carbonTable.getDatabaseName + val table = carbonTable.getFactTableName + val deletedBlocksList = getSegListIUDCompactionQualified(segLists, + absTableIdentifier, + segmentUpdateStatusManager, + compactionTypeIUD) + + if (deletedBlocksList.size() == 0) { + return + } + + LOG.info(s"Horizontal Delete Compaction operation started for [${db}.${table}].") + LOG.audit(s"Horizontal Delete Compaction operation started for [${db}.${table}].") + + try { + + // Delete Compaction RDD + val rdd1 = sqlContext.sparkContext + .parallelize(deletedBlocksList.asScala.toSeq, deletedBlocksList.size()) + + val timestamp = factTimeStamp + val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails + val result = rdd1.mapPartitions(iter => + new Iterator[Seq[CarbonDataMergerUtilResult]] { + override def hasNext: Boolean = iter.hasNext + + override def next(): Seq[CarbonDataMergerUtilResult] = { + val segmentAndBlocks = iter.next + val segment = segmentAndBlocks.substring(0, segmentAndBlocks.lastIndexOf("/")) + val blockName = segmentAndBlocks + .substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length) + + val result = compactBlockDeleteDeltaFiles(segment, blockName, + absTableIdentifier, + updateStatusDetails, + timestamp) + + result.asScala.toList + + } + }).collect + + val resultList = ListBuffer[CarbonDataMergerUtilResult]() + result.foreach(x => { + x.foreach(y => { + resultList += y + }) + }) + + val updateStatus = updateStatusFile(resultList.toList.asJava, + carbonTable, + timestamp.toString, + segmentUpdateStatusManager) + if (updateStatus == false) { + LOG.audit(s"Delete Compaction data operation is failed for [${db}.${table}].") + LOG.error("Delete Compaction data operation is failed.") + throw new HorizontalCompactionException( + s"Horizontal Delete Compaction Failed for [${db}.${table}] ." + + s" Please check logs for more info.", factTimeStamp) + } + else { + LOG.info(s"Horizontal Delete Compaction operation completed for [${db}.${table}].") + LOG.audit(s"Horizontal Delete Compaction operation completed for [${db}.${table}].") + } + } + catch { + case e: Exception => + val msg = if (null != e.getMessage) { + e.getMessage + } else { + "Please check logs for more info" + } + throw new HorizontalCompactionException( + s"Horizontal Delete Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp) + } + } +} + +class HorizontalCompactionException( + message: String, + // required for cleanup + val compactionTimeStamp: Long) extends Exception(message) { +} + +object deleteExecution { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def getTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = { + if (tableIdentifier.size > 1) { + TableIdentifier(tableIdentifier(1), Some(tableIdentifier(0))) + } else { + TableIdentifier(tableIdentifier(0), None) + } + } + + def deleteDeltaExecution(identifier: Seq[String], + sqlContext: SQLContext, + dataRdd: RDD[Row], + timestamp: String, relation: CarbonRelation, isUpdateOperation: Boolean, + executorErrors: ExecutionErrors): Boolean = { + + var res: Array[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] = null + val tableName = getTableIdentifier(identifier).table + val database = getDB.getDatabaseName(getTableIdentifier(identifier).database, sqlContext) + val relation = CarbonEnv.get.carbonMetastore + .lookupRelation1(getTableIdentifier(identifier))(sqlContext). + asInstanceOf[CarbonRelation] + + val storeLocation = relation.tableMeta.storePath + val absoluteTableIdentifier: AbsoluteTableIdentifier = new + AbsoluteTableIdentifier(storeLocation, + relation.tableMeta.carbonTableIdentifier) + var tablePath = CarbonStorePath + .getCarbonTablePath(storeLocation, + absoluteTableIdentifier.getCarbonTableIdentifier()) + var tableUpdateStatusPath = tablePath.getTableUpdateStatusFilePath + val totalSegments = SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath).length + var factPath = tablePath.getFactDir + + var carbonTable = relation.tableMeta.carbonTable + var deleteStatus = true + val deleteRdd = if (isUpdateOperation) { + val schema = + org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField( + CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, + org.apache.spark.sql.types.StringType))) + val rdd = dataRdd + .map(row => Row(row.get(row.fieldIndex( + CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)))) + sqlContext.createDataFrame(rdd, schema).rdd + } else { + dataRdd + } + + val (carbonInputFormat, job) = + QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier) + + val keyRdd = deleteRdd.map({ row => + val tupleId: String = row + .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) + val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId) + (key, row) + }).groupByKey() + + // if no loads are present then no need to do anything. + if (keyRdd.partitions.size == 0) { + return true + } + + var blockMappingVO = carbonInputFormat.getBlockRowCount(job, absoluteTableIdentifier) + val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier) + CarbonUpdateUtil + .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr) + + val rowContRdd = sqlContext.sparkContext + .parallelize(blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq, + keyRdd.partitions.size) + + val rdd = rowContRdd.join(keyRdd) + + res = rdd.mapPartitionsWithIndex( + (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) => + Iterator[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] { + + var result = List[(String, (SegmentUpdateDetails, ExecutionErrors))]() + while (records.hasNext) { + val ((key), (rowCountDetailsVO, groupedRows)) = records.next + result = result ++ + deleteDeltaFunc(index, + key, + groupedRows.toIterator, + timestamp, + rowCountDetailsVO) + + } + result + } + ).collect() + + // if no loads are present then no need to do anything. + if (res.isEmpty) { + return true + } + + // update new status file + checkAndUpdateStatusFiles + + // all or none : update status file, only if complete delete opeartion is successfull. + def checkAndUpdateStatusFiles: Unit = { + val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]() + val segmentDetails = new util.HashSet[String]() + res.foreach(resultOfSeg => resultOfSeg.foreach( + resultOfBlock => { + if (resultOfBlock._1.equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)) { + blockUpdateDetailsList.add(resultOfBlock._2._1) + segmentDetails.add(resultOfBlock._2._1.getSegmentName) + // if this block is invalid then decrement block count in map. + if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getStatus)) { + CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1, + blockMappingVO.getSegmentNumberOfBlockMapping) + } + } + else { + deleteStatus = false + // In case of failure , clean all related delete delta files + CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) + LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }") + val errorMsg = + "Delete data operation is failed due to failure in creating delete delta file for " + + "segment : " + resultOfBlock._2._1.getSegmentName + " block : " + + resultOfBlock._2._1.getBlockName + executorErrors.failureCauses = resultOfBlock._2._2.failureCauses + executorErrors.errorMsg = resultOfBlock._2._2.errorMsg + + if (executorErrors.failureCauses == FailureCauses.NONE) { + executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE + executorErrors.errorMsg = errorMsg + } + LOGGER.error(errorMsg) + return + } + } + ) + ) + + val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil + .getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping) + + + + // this is delete flow so no need of putting timestamp in the status file. + if (CarbonUpdateUtil + .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, false) && + CarbonUpdateUtil + .updateTableMetadataStatus(segmentDetails, + carbonTable, + timestamp, + !isUpdateOperation, + listOfSegmentToBeMarkedDeleted) + ) { + LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }") + LOGGER.audit(s"Delete data operation is successful for ${ database }.${ tableName }") + } + else { + // In case of failure , clean all related delete delta files + CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) + + val errorMessage = "Delete data operation is failed due to failure " + + "in table status updation." + LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }") + LOGGER.error("Delete data operation is failed due to failure in table status updation.") + executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE + executorErrors.errorMsg = errorMessage + // throw new Exception(errorMessage) + } + } + + def deleteDeltaFunc(index: Int, + key: String, + iter: Iterator[Row], + timestamp: String, + rowCountDetailsVO: RowCountDetailsVO): + Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] = { + + val result = new DeleteDelataResultImpl() + var deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + // here key = segment/blockName + val blockName = CarbonUpdateUtil + .getBlockName( + CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1))) + val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0) + var deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new DeleteDeltaBlockDetails(blockName) + val resultIter = new Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] { + val segmentUpdateDetails = new SegmentUpdateDetails() + var TID = "" + var countOfRows = 0 + try { + while (iter.hasNext) { + val oneRow = iter.next + TID = oneRow + .get(oneRow.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).toString + val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET) + val blockletId = CarbonUpdateUtil + .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID) + val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset) + // stop delete operation + if(!IsValidOffset) { + executorErrors.failureCauses = FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING + executorErrors.errorMsg = "Multiple input rows matched for same row." + throw new MultipleMatchingException("Multiple input rows matched for same row.") + } + countOfRows = countOfRows + 1 + } + + val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath) + val completeBlockName = CarbonTablePath + .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) + + CarbonCommonConstants.FACT_FILE_EXT) + val deleteDeletaPath = CarbonUpdateUtil + .getDeleteDeltaFilePath(blockPath, blockName, timestamp) + val carbonDeleteWriter = new CarbonDeleteDeltaWriterImpl(deleteDeletaPath, + FileFactory.getFileType(deleteDeletaPath)) + + + + segmentUpdateDetails.setBlockName(blockName) + segmentUpdateDetails.setActualBlockName(completeBlockName) + segmentUpdateDetails.setSegmentName(segmentId) + segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp) + segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp) + + val alreadyDeletedRows: Long = rowCountDetailsVO.getDeletedRowsInBlock + val totalDeletedRows: Long = alreadyDeletedRows + countOfRows + segmentUpdateDetails.setDeletedRowsInBlock(totalDeletedRows.toString) + if (totalDeletedRows == rowCountDetailsVO.getTotalNumberOfRows) { + segmentUpdateDetails.setStatus(CarbonCommonConstants.MARKED_FOR_DELETE) + } + else { + // write the delta file + carbonDeleteWriter.write(deleteDeltaBlockDetails) + } + + deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS + } catch { + case e : MultipleMatchingException => + LOGGER.audit(e.getMessage) + LOGGER.error(e.getMessage) + // dont throw exception here. + case e: Exception => + val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }." + LOGGER.audit(errorMsg) + LOGGER.error(errorMsg + e.getMessage) + throw e + } + + + var finished = false + + override def hasNext: Boolean = { + if (!finished) { + finished = true + finished + } + else { + !finished + } + } + + override def next(): (String, (SegmentUpdateDetails, ExecutionErrors)) = { + finished = true + result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors)) + } + } + resultIter + } + true + } +} + + + +object UpdateExecution { + + def performUpdate(dataFrame: DataFrame, tableIdentifier: Seq[String], plan: LogicalPlan, + sqlContext: SQLContext, currentTime: Long, executorErrors: ExecutionErrors): Unit = { + + def isDestinationRelation(relation: CarbonDatasourceRelation): Boolean = { + + // Raghu Huawei IUD + val tableName = ""// relation.getTableName() + val dbName = ""// relation.getDatabaseName() + (tableIdentifier.size > 1 && + tableIdentifier(0) == dbName && + tableIdentifier(1) == tableName) || + (tableIdentifier(0) == tableName) + } + def getHeader(relation: CarbonDatasourceRelation, plan: LogicalPlan): String = { + var header = "" + var found = false + + plan match { + case Project(pList, _) if (!found) => + found = true + header = pList + .filter(field => !field.name + .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) + .map(col => if (col.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION)) { + col.name + .substring(0, col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)) + } + else { + col.name + }).mkString(",") + } + header + } + val ex = dataFrame.queryExecution.analyzed + val res = ex find { + case relation: LogicalRelation if (relation.relation.isInstanceOf[CarbonDatasourceRelation] && + isDestinationRelation(relation.relation + .asInstanceOf[CarbonDatasourceRelation])) => + true + case _ => false + } + val carbonRelation: CarbonDatasourceRelation = res match { + case Some(relation: LogicalRelation) => + relation.relation.asInstanceOf[CarbonDatasourceRelation] + case _ => sys.error("") + } + + val updateTableModel = UpdateTableModel(true, currentTime, executorErrors) + + val header = getHeader(carbonRelation, plan) + + // Raghu Huawei IUD +// LoadTable( +// Some(carbonRelation.getDatabaseName()), +// carbonRelation.getTableName(), +// null, +// Seq(), +// Map(("fileheader" -> header)), +// false, +// null, +// Some(dataFrame), +// Some(updateTableModel)).run(sqlContext) + // Raghu Huawei IUD end + + executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg + executorErrors.failureCauses = updateTableModel.executorErrors.failureCauses + Seq.empty + } + }
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala index 924d91a..3098b43 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala @@ -24,8 +24,9 @@ import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastorage.store.impl.FileFactory +import org.apache.carbondata.core.updatestatus.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.lcm.status.SegmentStatusManager +import org.apache.carbondata.core.updatestatus.SegmentStatusManager import org.apache.spark.sql.common.util.CarbonHiveContext._ import org.apache.spark.sql.common.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -113,22 +114,26 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll { sql("clean files for table table2") // check for table 1. - val identifier1 = new AbsoluteTableIdentifier( + val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new + AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), - new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table1", "rrr")) + new CarbonTableIdentifier("default", "table1", "rrr") + ) + ) // merged segment should not be there - val segments = SegmentStatusManager.getSegmentStatus(identifier1) - .getValidSegments.asScala.toList + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList assert(segments.contains("0.1")) assert(!segments.contains("0")) assert(!segments.contains("1")) // check for table 2. - val identifier2 = new AbsoluteTableIdentifier( + val segmentStatusManager2: SegmentStatusManager = new SegmentStatusManager(new + AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), - new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table2", "rrr1")) + new CarbonTableIdentifier("default", "table2", "rrr1") + ) + ) // merged segment should not be there - val segments2 = SegmentStatusManager.getSegmentStatus(identifier2) - .getValidSegments.asScala.toList + val segments2 = segmentStatusManager2.getValidAndInvalidSegments.getValidSegments.asScala.toList assert(segments2.contains("0.1")) assert(!segments2.contains("0")) assert(!segments2.contains("1")) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperations.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperations.java b/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperations.java deleted file mode 100644 index 92dd825..0000000 --- a/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperations.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.lcm.fileoperations; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -public interface AtomicFileOperations { - - DataInputStream openForRead() throws IOException; - - void close() throws IOException; - - DataOutputStream openForWrite(FileWriteOperation operation) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperationsImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperationsImpl.java b/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperationsImpl.java deleted file mode 100644 index 6c56515..0000000 --- a/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/AtomicFileOperationsImpl.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.lcm.fileoperations; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType; - -public class AtomicFileOperationsImpl implements AtomicFileOperations { - - private String filePath; - - private FileType fileType; - - private String tempWriteFilePath; - - private DataOutputStream dataOutStream; - - public AtomicFileOperationsImpl(String filePath, FileType fileType) { - this.filePath = filePath; - - this.fileType = fileType; - } - - @Override public DataInputStream openForRead() throws IOException { - return FileFactory.getDataInputStream(filePath, fileType); - } - - @Override public DataOutputStream openForWrite(FileWriteOperation operation) throws IOException { - - filePath = filePath.replace("\\", "/"); - - tempWriteFilePath = filePath + CarbonCommonConstants.TEMPWRITEFILEEXTENSION; - - if (FileFactory.isFileExist(tempWriteFilePath, fileType)) { - FileFactory.getCarbonFile(tempWriteFilePath, fileType).delete(); - } - - FileFactory.createNewFile(tempWriteFilePath, fileType); - - dataOutStream = FileFactory.getDataOutputStream(tempWriteFilePath, fileType); - - return dataOutStream; - - } - - /* (non-Javadoc) - * @see com.huawei.unibi.carbon.datastorage.store.fileperations.AtomicFileOperations#close() - */ - @Override public void close() throws IOException { - - if (null != dataOutStream) { - dataOutStream.close(); - - CarbonFile tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType); - - if (!tempFile.renameForce(filePath)) { - throw new IOException("temporary file renaming failed, src=" - + tempFile.getPath() + ", dest=" + filePath); - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/FileWriteOperation.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/FileWriteOperation.java b/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/FileWriteOperation.java deleted file mode 100644 index 8c228b9..0000000 --- a/processing/src/main/java/org/apache/carbondata/lcm/fileoperations/FileWriteOperation.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.lcm.fileoperations; - -public enum FileWriteOperation { - - APPEND, OVERWRITE -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/AbstractCarbonLock.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/AbstractCarbonLock.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/AbstractCarbonLock.java deleted file mode 100644 index 8054c41..0000000 --- a/processing/src/main/java/org/apache/carbondata/lcm/locks/AbstractCarbonLock.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.lcm.locks; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.util.CarbonProperties; - -/** - * This is the abstract class of the lock implementations.This handles the - * retrying part of the locking. - */ -public abstract class AbstractCarbonLock implements ICarbonLock { - private int retryCount; - - private int retryTimeout; - - public abstract boolean lock(); - - /** - * API for enabling the locking of file with retries. - */ - public boolean lockWithRetries() { - try { - for (int i = 0; i < retryCount; i++) { - if (lock()) { - return true; - } else { - Thread.sleep(retryTimeout * 1000L); - } - } - } catch (InterruptedException e) { - return false; - } - return false; - } - - /** - * Initializes the retry count and retry timeout. - * This will determine how many times to retry to acquire lock and the retry timeout. - */ - protected void initRetry() { - String retries = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK); - try { - retryCount = Integer.parseInt(retries); - } catch (NumberFormatException e) { - retryCount = CarbonCommonConstants.NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK_DEFAULT; - } - - String maxTimeout = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK); - try { - retryTimeout = Integer.parseInt(maxTimeout); - } catch (NumberFormatException e) { - retryTimeout = CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK_DEFAULT; - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockFactory.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockFactory.java deleted file mode 100644 index fe98c48..0000000 --- a/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockFactory.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.lcm.locks; - -import org.apache.carbondata.core.carbon.CarbonTableIdentifier; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.util.CarbonProperties; - -/** - * This class is a Lock factory class which is used to provide lock objects. - * Using this lock object client can request the lock and unlock. - */ -public class CarbonLockFactory { - - /** - * lockTypeConfigured to check if zookeeper feature is enabled or not for carbon. - */ - private static String lockTypeConfigured; - - static { - CarbonLockFactory.getLockTypeConfigured(); - } - - /** - * This method will determine the lock type. - * - * @param tableIdentifier - * @param lockFile - * @return - */ - public static ICarbonLock getCarbonLockObj(CarbonTableIdentifier tableIdentifier, - String lockFile) { - switch (lockTypeConfigured.toUpperCase()) { - case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL: - return new LocalFileLock(tableIdentifier, lockFile); - - case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER: - return new ZooKeeperLocking(tableIdentifier, lockFile); - - case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS: - return new HdfsFileLock(tableIdentifier, lockFile); - - default: - throw new UnsupportedOperationException("Not supported the lock type"); - } - } - - /** - * - * @param locFileLocation - * @param lockFile - * @return carbon lock - */ - public static ICarbonLock getCarbonLockObj(String locFileLocation, String lockFile) { - switch (lockTypeConfigured.toUpperCase()) { - case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL: - return new LocalFileLock(locFileLocation, lockFile); - - case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER: - return new ZooKeeperLocking(locFileLocation, lockFile); - - case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS: - return new HdfsFileLock(locFileLocation, lockFile); - - default: - throw new UnsupportedOperationException("Not supported the lock type"); - } - } - - /** - * This method will set the zookeeper status whether zookeeper to be used for locking or not. - */ - private static void getLockTypeConfigured() { - lockTypeConfigured = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.LOCK_TYPE_DEFAULT); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java deleted file mode 100644 index 31ac8e5..0000000 --- a/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.lcm.locks; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; - -/** - * This class contains all carbon lock utilities - */ -public class CarbonLockUtil { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(CarbonLockUtil.class.getName()); - - /** - * unlocks given file - * - * @param carbonLock - */ - public static void fileUnlock(ICarbonLock carbonLock, String locktype) { - if (carbonLock.unlock()) { - if (locktype.equals(LockUsage.METADATA_LOCK)) { - LOGGER.info("Metadata lock has been successfully released"); - } else if (locktype.equals(LockUsage.TABLE_STATUS_LOCK)) { - LOGGER.info("Table status lock has been successfully released"); - } - else if (locktype.equals(LockUsage.CLEAN_FILES_LOCK)) { - LOGGER.info("Clean files lock has been successfully released"); - } - else if (locktype.equals(LockUsage.DELETE_SEGMENT_LOCK)) { - LOGGER.info("Delete segments lock has been successfully released"); - } - } else { - if (locktype.equals(LockUsage.METADATA_LOCK)) { - LOGGER.error("Not able to release the metadata lock"); - } else if (locktype.equals(LockUsage.TABLE_STATUS_LOCK)) { - LOGGER.error("Not able to release the table status lock"); - } - else if (locktype.equals(LockUsage.CLEAN_FILES_LOCK)) { - LOGGER.info("Not able to release the clean files lock"); - } - else if (locktype.equals(LockUsage.DELETE_SEGMENT_LOCK)) { - LOGGER.info("Not able to release the delete segments lock"); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/HdfsFileLock.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/HdfsFileLock.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/HdfsFileLock.java deleted file mode 100644 index 9770d7e..0000000 --- a/processing/src/main/java/org/apache/carbondata/lcm/locks/HdfsFileLock.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.lcm.locks; - -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.carbon.CarbonTableIdentifier; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory; -import org.apache.carbondata.core.util.CarbonProperties; - -import org.apache.hadoop.conf.Configuration; - -/** - * This class is used to handle the HDFS File locking. - * This is acheived using the concept of acquiring the data out stream using Append option. - */ -public class HdfsFileLock extends AbstractCarbonLock { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(HdfsFileLock.class.getName()); - /** - * location hdfs file location - */ - private String location; - - private DataOutputStream dataOutputStream; - - public static String tmpPath; - - static { - Configuration conf = new Configuration(true); - String hdfsPath = conf.get(CarbonCommonConstants.FS_DEFAULT_FS); - // By default, we put the hdfs lock meta file for one table inside this table's store folder. - // If can not get the STORE_LOCATION, then use hadoop.tmp.dir . - tmpPath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION, - System.getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION)); - if (!tmpPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) { - tmpPath = hdfsPath + tmpPath; - } - } - - /** - * @param lockFileLocation - * @param lockFile - */ - public HdfsFileLock(String lockFileLocation, String lockFile) { - this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + lockFileLocation - + CarbonCommonConstants.FILE_SEPARATOR + lockFile; - LOGGER.info("HDFS lock path:"+this.location); - initRetry(); - } - - /** - * @param tableIdentifier - * @param lockFile - */ - public HdfsFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) { - this(tableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier - .getTableName(), lockFile); - } - - /* (non-Javadoc) - * @see org.apache.carbondata.core.locks.ICarbonLock#lock() - */ - @Override public boolean lock() { - try { - if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) { - FileFactory.createNewLockFile(location, FileFactory.getFileType(location)); - } - dataOutputStream = - FileFactory.getDataOutputStreamUsingAppend(location, FileFactory.getFileType(location)); - - return true; - - } catch (IOException e) { - return false; - } - } - - /* (non-Javadoc) - * @see org.apache.carbondata.core.locks.ICarbonLock#unlock() - */ - @Override public boolean unlock() { - if (null != dataOutputStream) { - try { - dataOutputStream.close(); - } catch (IOException e) { - return false; - } finally { - if (FileFactory.getCarbonFile(location, FileFactory.getFileType(location)).delete()) { - LOGGER.info("Deleted the lock file " + location); - } else { - LOGGER.error("Not able to delete the lock file " + location); - } - } - } - return true; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/ICarbonLock.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/ICarbonLock.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/ICarbonLock.java deleted file mode 100644 index a9a7f77..0000000 --- a/processing/src/main/java/org/apache/carbondata/lcm/locks/ICarbonLock.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.lcm.locks; - -/** - * Carbon Lock Interface which handles the locking and unlocking. - */ -public interface ICarbonLock { - - /** - * Does the unlocking of the acquired lock. - * - * @return - */ - boolean unlock(); - - /** - * This will acquire the lock and if it doesnt get then it will retry after the confiured time. - * - * @return - */ - boolean lockWithRetries(); - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/LocalFileLock.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/LocalFileLock.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/LocalFileLock.java deleted file mode 100644 index f1103b1..0000000 --- a/processing/src/main/java/org/apache/carbondata/lcm/locks/LocalFileLock.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.lcm.locks; - -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.channels.FileChannel; -import java.nio.channels.FileLock; -import java.nio.channels.OverlappingFileLockException; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.carbon.CarbonTableIdentifier; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory; - -/** - * This class handles the file locking in the local file system. - * This will be handled using the file channel lock API. - */ -public class LocalFileLock extends AbstractCarbonLock { - /** - * location is the location of the lock file. - */ - private String location; - - /** - * fileOutputStream of the local lock file - */ - private FileOutputStream fileOutputStream; - - /** - * channel is the FileChannel of the lock file. - */ - private FileChannel channel; - - /** - * fileLock NIO FileLock Object - */ - private FileLock fileLock; - - /** - * lock file - */ - private String lockFile; - - public static final String tmpPath; - - private String lockFilePath; - - /** - * LOGGER for logging the messages. - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(LocalFileLock.class.getName()); - - static { - tmpPath = System.getProperty("java.io.tmpdir"); - } - - /** - * @param lockFileLocation - * @param lockFile - */ - public LocalFileLock(String lockFileLocation, String lockFile) { - this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + lockFileLocation; - this.lockFile = lockFile; - initRetry(); - } - - /** - * @param tableIdentifier - * @param lockFile - */ - public LocalFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) { - this(tableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier - .getTableName(), lockFile); - initRetry(); - } - - /** - * Lock API for locking of the file channel of the lock file. - * - * @return - */ - @Override public boolean lock() { - try { - if (!FileFactory.isFileExist(location, FileFactory.getFileType(tmpPath))) { - FileFactory.mkdirs(location, FileFactory.getFileType(tmpPath)); - } - lockFilePath = location + CarbonCommonConstants.FILE_SEPARATOR + - lockFile; - if (!FileFactory.isFileExist(lockFilePath, FileFactory.getFileType(location))) { - FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(location)); - } - - fileOutputStream = new FileOutputStream(lockFilePath); - channel = fileOutputStream.getChannel(); - try { - fileLock = channel.tryLock(); - } catch (OverlappingFileLockException e) { - return false; - } - if (null != fileLock) { - return true; - } else { - return false; - } - } catch (IOException e) { - return false; - } - - } - - /** - * Unlock API for unlocking of the acquired lock. - * - * @return - */ - @Override public boolean unlock() { - boolean status; - try { - if (null != fileLock) { - fileLock.release(); - } - status = true; - } catch (IOException e) { - status = false; - } finally { - if (null != fileOutputStream) { - try { - fileOutputStream.close(); - // deleting the lock file after releasing the lock. - if (FileFactory.getCarbonFile(lockFilePath, FileFactory.getFileType(lockFilePath)) - .delete()) { - LOGGER.info("Successfully deleted the lock file " + lockFilePath); - } else { - LOGGER.error("Not able to delete the lock file " + lockFilePath); - } - } catch (IOException e) { - LOGGER.error(e.getMessage()); - } - } - } - return status; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java deleted file mode 100644 index 9b84042..0000000 --- a/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.lcm.locks; - -/** - * This enum is used to define the usecase of the lock. - * Each enum value is one specific lock case. - */ -public class LockUsage { - public static final String LOCK = ".lock"; - public static final String METADATA_LOCK = "meta.lock"; - public static final String COMPACTION_LOCK = "compaction.lock"; - public static final String SYSTEMLEVEL_COMPACTION_LOCK = "system_level_compaction.lock"; - public static final String TABLE_STATUS_LOCK = "tablestatus.lock"; - public static final String DELETE_SEGMENT_LOCK = "delete_segment.lock"; - public static final String CLEAN_FILES_LOCK = "clean_files.lock"; - public static final String DROP_TABLE_LOCK = "droptable.lock"; - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/ZooKeeperLocking.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/ZooKeeperLocking.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/ZooKeeperLocking.java deleted file mode 100644 index c45fbb0..0000000 --- a/processing/src/main/java/org/apache/carbondata/lcm/locks/ZooKeeperLocking.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.lcm.locks; - -import java.io.File; -import java.util.Collections; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.carbon.CarbonTableIdentifier; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.util.CarbonProperties; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; - -/** - * For Handling the zookeeper locking implementation - */ -public class ZooKeeperLocking extends AbstractCarbonLock { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(ZooKeeperLocking.class.getName()); - - /** - * zk is the zookeeper client instance - */ - private static ZooKeeper zk; - - /** - * zooKeeperLocation is the location in the zoo keeper file system where the locks will be - * maintained. - */ - private static final String zooKeeperLocation = CarbonCommonConstants.ZOOKEEPER_LOCATION; - - /** - * Unique folder for each table with DatabaseName_TableName - */ - private final String tableIdFolder; - - /** - * lockName is the name of the lock to use. This name should be same for every process that want - * to share the same lock - */ - private String lockName; - - /** - * lockPath is the unique path created for the each instance of the carbon lock. - */ - private String lockPath; - - private String lockTypeFolder; - - public ZooKeeperLocking(CarbonTableIdentifier tableIdentifier, String lockFile) { - this(tableIdentifier.getDatabaseName() + File.separator + tableIdentifier.getTableName(), - lockFile); - } - - /** - * @param lockLocation - * @param lockFile - */ - public ZooKeeperLocking(String lockLocation, String lockFile) { - this.lockName = lockFile; - this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation; - - String zooKeeperUrl = - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ZOOKEEPER_URL); - zk = ZookeeperInit.getInstance(zooKeeperUrl).getZookeeper(); - - this.lockTypeFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation - + CarbonCommonConstants.FILE_SEPARATOR + lockFile; - try { - createBaseNode(); - // if exists returns null then path doesnt exist. so creating. - if (null == zk.exists(this.tableIdFolder, true)) { - createRecursivly(this.tableIdFolder); - } - // if exists returns null then path doesnt exist. so creating. - if (null == zk.exists(this.lockTypeFolder, true)) { - zk.create(this.lockTypeFolder, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - } catch (KeeperException | InterruptedException e) { - LOGGER.error(e, e.getMessage()); - } - initRetry(); - } - - /** - * Creating a znode in which all the znodes (lock files )are maintained. - */ - private void createBaseNode() throws KeeperException, InterruptedException { - if (null == zk.exists(zooKeeperLocation, true)) { - // creating a znode in which all the znodes (lock files )are maintained. - zk.create(zooKeeperLocation, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - } - - /** - * Create zookeepr node if not exist - * @param path - * @throws KeeperException - * @throws InterruptedException - */ - private void createRecursivly(String path) throws KeeperException, InterruptedException { - try { - if (zk.exists(path, true) == null && path.length() > 0) { - String temp = path.substring(0, path.lastIndexOf(File.separator)); - createRecursivly(temp); - zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } else { - return; - } - } catch (KeeperException e) { - throw e; - } catch (InterruptedException e) { - throw e; - } - - } - /** - * Handling of the locking mechanism using zoo keeper. - */ - @Override public boolean lock() { - try { - // create the lock file with lockName. - lockPath = - zk.create(this.lockTypeFolder + CarbonCommonConstants.FILE_SEPARATOR + lockName, null, - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); - - // get the children present in zooKeeperLocation. - List<String> nodes = zk.getChildren(this.lockTypeFolder, null); - - // sort the childrens - Collections.sort(nodes); - - // here the logic is , for each lock request zookeeper will create a file ending with - // incremental digits. - // so first request will be 00001 next is 00002 and so on. - // if the current request is 00002 and already one previous request(00001) is present then get - // children will give both nodes. - // after the sort we are checking if the lock path is first or not .if it is first then lock - // has been acquired. - - if (lockPath.endsWith(nodes.get(0))) { - return true; - } else { - // if locking failed then deleting the created lock as next time again new lock file will be - // created. - zk.delete(lockPath, -1); - return false; - } - } catch (KeeperException | InterruptedException e) { - LOGGER.error(e, e.getMessage()); - return false; - } - } - - /** - * @return status where lock file is unlocked or not. - */ - @Override public boolean unlock() { - try { - // exists will return null if the path doesn't exists. - if (null != zk.exists(lockPath, true)) { - zk.delete(lockPath, -1); - lockPath = null; - } - } catch (KeeperException | InterruptedException e) { - LOGGER.error(e, e.getMessage()); - return false; - } - return true; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/processing/src/main/java/org/apache/carbondata/lcm/locks/ZookeeperInit.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/ZookeeperInit.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/ZookeeperInit.java deleted file mode 100644 index 03827b6..0000000 --- a/processing/src/main/java/org/apache/carbondata/lcm/locks/ZookeeperInit.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.lcm.locks; - -import java.io.IOException; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; - -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; - -/** - * This is a singleton class for initialization of zookeeper client. - */ -public class ZookeeperInit { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(ZookeeperInit.class.getName()); - - private static ZookeeperInit zooKeeperInit; - /** - * zk is the zookeeper client instance - */ - private ZooKeeper zk; - - private ZookeeperInit(String zooKeeperUrl) { - - int sessionTimeOut = 100000; - try { - zk = new ZooKeeper(zooKeeperUrl, sessionTimeOut, new DummyWatcher()); - - } catch (IOException e) { - LOGGER.error(e.getMessage()); - } - - } - - public static ZookeeperInit getInstance(String zooKeeperUrl) { - - if (null == zooKeeperInit) { - synchronized (ZookeeperInit.class) { - if (null == zooKeeperInit) { - LOGGER.info("Initiating Zookeeper client."); - zooKeeperInit = new ZookeeperInit(zooKeeperUrl); - } - } - } - return zooKeeperInit; - - } - - public static ZookeeperInit getInstance() { - return zooKeeperInit; - } - - public ZooKeeper getZookeeper() { - return zk; - } - - private static class DummyWatcher implements Watcher { - public void process(WatchedEvent event) { - } - } -}