This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 8196dc2 [CARBONDATA-3812] Set output metrics for data load spark job 8196dc2 is described below commit 8196dc26c7e0924fad54aba558df9dbea5000131 Author: QiangCai <qiang...@qq.com> AuthorDate: Sat May 9 11:52:24 2020 +0800 [CARBONDATA-3812] Set output metrics for data load spark job Why is this PR needed? data load jobs are missing output metrics. please check detail in jira: CARBONDATA-3812 What changes were proposed in this PR? re-factory OutputFilesInfoHolder to DataLoadMetrics add metrics: numOutputBytes and numOutputRows Does this PR introduce any user interface change? No Is any new testcase added? No This closes #3757 --- .../apache/carbondata/core/util/CarbonUtil.java | 13 ++--- ...utFilesInfoHolder.java => DataLoadMetrics.java} | 60 ++++++++++++++-------- .../hadoop/api/CarbonOutputCommitter.java | 2 +- .../hadoop/api/CarbonTableOutputFormat.java | 20 ++++---- .../carbondata/hive/MapredCarbonOutputFormat.java | 2 +- .../carbondata/hive/util/HiveCarbonUtil.java | 4 +- .../spark/load/DataLoadProcessBuilderOnSpark.scala | 4 ++ .../spark/rdd/NewCarbonDataLoadRDD.scala | 5 ++ .../spark/sql/events/MergeIndexEventListener.scala | 9 ++-- .../CarbonAlterTableCompactionCommand.scala | 4 +- .../datasources/SparkCarbonTableFormat.scala | 5 +- .../org/apache/spark/sql/util/SparkSQLUtil.scala | 9 ++++ .../loading/CarbonDataLoadConfiguration.java | 12 ++--- .../processing/loading/DataLoadProcessBuilder.java | 2 +- .../processing/loading/model/CarbonLoadModel.java | 16 +++--- .../loading/model/CarbonLoadModelBuilder.java | 3 ++ .../CarbonRowDataWriterProcessorStepImpl.java | 3 ++ .../loading/steps/DataWriterProcessorStepImpl.java | 3 ++ .../store/CarbonFactDataHandlerModel.java | 17 +++--- .../store/writer/AbstractFactDataWriter.java | 12 ++--- 20 files changed, 125 insertions(+), 80 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index ed33aa2..57bb093 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -2628,25 +2628,26 @@ public final class CarbonUtil { public static void copyCarbonDataFileToCarbonStorePath( String localFilePath, String targetPath, long fileSizeInBytes, - OutputFilesInfoHolder outputFilesInfoHolder) throws CarbonDataWriterException { + DataLoadMetrics metrics) throws CarbonDataWriterException { if (targetPath.endsWith(".tmp") && localFilePath .endsWith(CarbonCommonConstants.FACT_FILE_EXT)) { // for partition case, write carbondata file directly to final path, keep index in temp path. // This can improve the commit job performance on s3a. targetPath = targetPath.substring(0, targetPath.lastIndexOf("/")); - if (outputFilesInfoHolder != null) { - outputFilesInfoHolder.addToPartitionPath(targetPath); + if (metrics != null) { + metrics.addToPartitionPath(targetPath); } } long targetSize = copyCarbonDataFileToCarbonStorePath(localFilePath, targetPath, fileSizeInBytes); - if (outputFilesInfoHolder != null) { + if (metrics != null) { // Storing the number of files written by each task. - outputFilesInfoHolder.incrementCount(); + metrics.incrementCount(); // Storing the files written by each task. - outputFilesInfoHolder.addToOutputFiles(targetPath + localFilePath + metrics.addToOutputFiles(targetPath + localFilePath .substring(localFilePath.lastIndexOf(File.separator)) + ":" + targetSize); + metrics.addOutputBytes(targetSize); } } diff --git a/core/src/main/java/org/apache/carbondata/core/util/OutputFilesInfoHolder.java b/core/src/main/java/org/apache/carbondata/core/util/DataLoadMetrics.java similarity index 73% rename from core/src/main/java/org/apache/carbondata/core/util/OutputFilesInfoHolder.java rename to core/src/main/java/org/apache/carbondata/core/util/DataLoadMetrics.java index 24d3ecd..273c9eb 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/OutputFilesInfoHolder.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataLoadMetrics.java @@ -21,10 +21,10 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -public class OutputFilesInfoHolder implements Serializable { - - private static final long serialVersionUID = -1401375818456585241L; - +/** + * store data loading metrics + */ +public class DataLoadMetrics implements Serializable { // stores the count of files written per task private int fileCount; @@ -37,6 +37,38 @@ public class OutputFilesInfoHolder implements Serializable { private long mergeIndexSize; + private long numOutputBytes = 0L; + + private long numOutputRows = 0L; + + public synchronized int getFileCount() { + return fileCount; + } + + public synchronized List<String> getOutputFiles() { + return outputFiles; + } + + public synchronized List<String> getPartitionPath() { + return partitionPath; + } + + public long getMergeIndexSize() { + return mergeIndexSize; + } + + public void setMergeIndexSize(long mergeIndexSize) { + this.mergeIndexSize = mergeIndexSize; + } + + public synchronized long getNumOutputBytes() { + return numOutputBytes; + } + + public synchronized long getNumOutputRows() { + return numOutputRows; + } + public synchronized void incrementCount() { // can call in multiple threads in single task fileCount++; @@ -56,23 +88,11 @@ public class OutputFilesInfoHolder implements Serializable { partitionPath.add(path); } - public int getFileCount() { - return fileCount; - } - - public List<String> getOutputFiles() { - return outputFiles; + public synchronized void addOutputBytes(long numOutputBytes) { + this.numOutputBytes += numOutputBytes; } - public List<String> getPartitionPath() { - return partitionPath; - } - - public long getMergeIndexSize() { - return mergeIndexSize; - } - - public void setMergeIndexSize(long mergeIndexSize) { - this.mergeIndexSize = mergeIndexSize; + public synchronized void addOutputRows(long numOutputRows) { + this.numOutputRows += numOutputRows; } } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index 33979e5..02c8d4c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -285,7 +285,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter { String segmentFileName = SegmentFileStore.genSegmentFileName( loadModel.getSegmentId(), String.valueOf(loadModel.getFactTimeStamp())); newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT); - newMetaEntry.setIndexSize("" + loadModel.getOutputFilesInfoHolder().getMergeIndexSize()); + newMetaEntry.setIndexSize("" + loadModel.getMetrics().getMergeIndexSize()); if (!StringUtils.isEmpty(size)) { newMetaEntry.setDataSize(size); } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 200eb44..ebac3d4 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -38,9 +38,9 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; +import org.apache.carbondata.core.util.DataLoadMetrics; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.core.util.ObjectSerializationUtil; -import org.apache.carbondata.core.util.OutputFilesInfoHolder; import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; import org.apache.carbondata.processing.loading.ComplexDelimitersEnum; @@ -242,7 +242,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter( final TaskAttemptContext taskAttemptContext) throws IOException { final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration()); - loadModel.setOutputFilesInfoHolder(new OutputFilesInfoHolder()); + loadModel.setMetrics(new DataLoadMetrics()); String appName = taskAttemptContext.getConfiguration().get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME); if (null != appName) { @@ -317,7 +317,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf)); model.setTableName(CarbonTableOutputFormat.getTableName(conf)); model.setCarbonTransactionalTable(true); - model.setOutputFilesInfoHolder(new OutputFilesInfoHolder()); + model.setMetrics(new DataLoadMetrics()); CarbonTable carbonTable = getCarbonTable(conf); // global dictionary is not supported since 2.0 @@ -482,17 +482,17 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje // clean up the folders and files created locally for data load operation TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false); } - OutputFilesInfoHolder outputFilesInfoHolder = loadModel.getOutputFilesInfoHolder(); - if (null != outputFilesInfoHolder) { + DataLoadMetrics metrics = loadModel.getMetrics(); + if (null != metrics) { taskAttemptContext.getConfiguration() - .set("carbon.number.of.output.files", outputFilesInfoHolder.getFileCount() + ""); - if (outputFilesInfoHolder.getOutputFiles() != null) { + .set("carbon.number.of.output.files", metrics.getFileCount() + ""); + if (metrics.getOutputFiles() != null) { appendConfiguration(taskAttemptContext.getConfiguration(), "carbon.output.files.name", - outputFilesInfoHolder.getOutputFiles()); + metrics.getOutputFiles()); } - if (outputFilesInfoHolder.getPartitionPath() != null) { + if (metrics.getPartitionPath() != null) { appendConfiguration(taskAttemptContext.getConfiguration(), - "carbon.output.partitions.name", outputFilesInfoHolder.getPartitionPath()); + "carbon.output.partitions.name", metrics.getPartitionPath()); } } LOG.info("Closed writer task " + taskAttemptContext.getTaskAttemptID()); diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java index 3a8c7a7..a486fc0 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java @@ -102,7 +102,7 @@ public class MapredCarbonOutputFormat<T> extends CarbonTableOutputFormat partitionInfo != null ? partitionInfo.getColumnSchemaList().size() : 0; String finalOutputPath = FileFactory.getCarbonFile(finalOutPath.toString()).getAbsolutePath(); if (carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().isHivePartitionTable()) { - carbonLoadModel.getOutputFilesInfoHolder().addToPartitionPath(finalOutputPath); + carbonLoadModel.getMetrics().addToPartitionPath(finalOutputPath); context.getConfiguration().set("carbon.outputformat.writepath", finalOutputPath); } CarbonTableOutputFormat.setLoadModel(jc, carbonLoadModel); diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java index b611edb..29b2e4d 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java @@ -49,7 +49,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableSchema; import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.OutputFilesInfoHolder; +import org.apache.carbondata.core.util.DataLoadMetrics; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.ThriftWriter; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; @@ -151,7 +151,7 @@ public class HiveCarbonUtil { throw new RuntimeException(e); } loadModel.setSkipParsers(); - loadModel.setOutputFilesInfoHolder(new OutputFilesInfoHolder()); + loadModel.setMetrics(new DataLoadMetrics()); return loadModel; } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index 8fb3002..55eee11 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -165,8 +165,10 @@ object DataLoadProcessBuilderOnSpark { sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => { setTaskListener(model.getTableName, model.getSegmentId, segmentMetaDataAccumulator) val loadModel = modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString) + loadModel.setMetrics(new DataLoadMetrics()) DataLoadProcessorStepOnSpark.writeFunc( rows, context.partitionId, loadModel, writeStepRowCounter, conf.value.value) + SparkSQLUtil.setOutputMetrics(context.taskMetrics().outputMetrics, loadModel.getMetrics) }) // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will @@ -252,8 +254,10 @@ object DataLoadProcessBuilderOnSpark { sc.runJob(newRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => { setTaskListener(model.getTableName, model.getSegmentId, segmentMetaDataAccumulator) val loadModel = modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString) + loadModel.setMetrics(new DataLoadMetrics()) DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, loadModel, writeStepRowCounter, conf.value.value) + SparkSQLUtil.setOutputMetrics(context.taskMetrics().outputMetrics, loadModel.getMetrics) }) // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will // not have any functional impact as spark automatically monitors the cache usage on each node diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 68f994c..521f105 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -33,6 +33,7 @@ import org.apache.spark.serializer.SerializerInstance import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.command.ExecutionErrors +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.util.{CollectionAccumulator, SparkUtil} import org.apache.carbondata.common.CarbonIterator @@ -160,6 +161,7 @@ class NewCarbonDataLoadRDD[K, V]( executor.execute(model, loader.storeLocation, recordReaders) + executor.close() } catch { case e: NoRetryException => loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) @@ -174,6 +176,7 @@ class NewCarbonDataLoadRDD[K, V]( LOGGER.error(e) throw e } finally { + SparkSQLUtil.setOutputMetrics(context.taskMetrics().outputMetrics, model.getMetrics) // clean up the folders and files created locally for data load operation TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false) // in case of failure the same operation will be re-tried several times. @@ -316,6 +319,7 @@ class NewDataFrameLoaderRDD[K, V]( carbonLoadModel.getTableName, carbonLoadModel.getSegment.getSegmentNo)) executor.execute(model, loader.storeLocation, recordReaders.toArray) + executor.close() } catch { case e: NoRetryException => loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) @@ -328,6 +332,7 @@ class NewDataFrameLoaderRDD[K, V]( LOGGER.error(e) throw e } finally { + SparkSQLUtil.setOutputMetrics(context.taskMetrics().outputMetrics, model.getMetrics) // clean up the folders and files created locally for data load operation TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false) // in case of failure the same operation will be re-tried several times. diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala index d067448..14fa4ac 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala @@ -29,10 +29,9 @@ import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.MergeIndexUtil import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.index.Segment import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.{ObjectSerializationUtil, OutputFilesInfoHolder} +import org.apache.carbondata.core.util.{DataLoadMetrics, ObjectSerializationUtil} import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent import org.apache.carbondata.processing.merger.CarbonDataMergerUtil @@ -86,9 +85,9 @@ class MergeIndexEventListener extends OperationEventListener with Logging { }, currPartitionSpec = currPartitionSpecOption ) - val outputFilesInfoHolder = new OutputFilesInfoHolder - loadModel.setOutputFilesInfoHolder(outputFilesInfoHolder) - loadModel.getOutputFilesInfoHolder.setMergeIndexSize(indexSize) + val metrics = new DataLoadMetrics + metrics.setMergeIndexSize(indexSize) + loadModel.setMetrics(metrics) LOGGER.info("Total time taken for merge index " + (System.currentTimeMillis() - startTime)) // clear Block dataMap Cache diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index aa1d294..2224943 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -42,8 +42,7 @@ import org.apache.carbondata.core.metadata.ColumnarFormatVersion import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.{SegmentStatusManager, SegmentUpdateStatusManager} -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataLoadMetrics} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.view.{MVSchema, MVStatus} import org.apache.carbondata.events._ @@ -177,6 +176,7 @@ case class CarbonAlterTableCompactionCommand( .getOrElse(CarbonCommonConstants.COMPRESSOR, CompressorFactory.getInstance().getCompressor.getName) carbonLoadModel.setColumnCompressor(columnCompressor) + carbonLoadModel.setMetrics(new DataLoadMetrics()) var storeLocation = System.getProperty("java.io.tmpdir") storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala index da46177..6ba1702 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types._ import org.apache.spark.TaskContext -import org.apache.spark.sql.execution.command.management.CommonLoadUtils import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datastore.compression.CompressorFactory @@ -46,7 +45,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} -import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverter, DataTypeConverterImpl, DataTypeUtil, ObjectSerializationUtil, OutputFilesInfoHolder, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.{CarbonProperties, DataLoadMetrics, DataTypeConverter, DataTypeConverterImpl, DataTypeUtil, ObjectSerializationUtil, ThreadLocalSessionInfo} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter @@ -96,7 +95,7 @@ with Serializable { .getOrElse(CarbonCommonConstants.COMPRESSOR, CompressorFactory.getInstance().getCompressor.getName) model.setColumnCompressor(columnCompressor) - model.setOutputFilesInfoHolder(new OutputFilesInfoHolder()) + model.setMetrics(new DataLoadMetrics()) val carbonProperty = CarbonProperties.getInstance() val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala index 435cb58..8bf3483 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.util import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.broadcast.Broadcast +import org.apache.spark.executor.OutputMetrics import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.InternalRow @@ -35,6 +36,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.DataLoadMetrics object SparkSQLUtil { def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState @@ -167,4 +169,11 @@ object SparkSQLUtil { */ sparkSession.sqlContext.table(carbonTable.getTableName) } + + def setOutputMetrics(outputMetrics: OutputMetrics, dataLoadMetrics: DataLoadMetrics): Unit = { + if (dataLoadMetrics != null && outputMetrics != null) { + outputMetrics.setBytesWritten(dataLoadMetrics.getNumOutputBytes) + outputMetrics.setRecordsWritten(dataLoadMetrics.getNumOutputRows) + } + } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java index f56be66..9d30331 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java @@ -31,7 +31,7 @@ import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; -import org.apache.carbondata.core.util.OutputFilesInfoHolder; +import org.apache.carbondata.core.util.DataLoadMetrics; public class CarbonDataLoadConfiguration { @@ -98,7 +98,7 @@ public class CarbonDataLoadConfiguration { private int numberOfLoadingCores; - private OutputFilesInfoHolder outputFilesInfoHolder; + private DataLoadMetrics metrics; /** * Whether index columns are present. This flag should be set only when all the schema @@ -378,12 +378,12 @@ public class CarbonDataLoadConfiguration { this.segmentPath = segmentPath; } - public OutputFilesInfoHolder getOutputFilesInfoHolder() { - return outputFilesInfoHolder; + public DataLoadMetrics getMetrics() { + return metrics; } - public void setOutputFilesInfoHolder(OutputFilesInfoHolder outputFilesInfoHolder) { - this.outputFilesInfoHolder = outputFilesInfoHolder; + public void setMetrics(DataLoadMetrics metrics) { + this.metrics = metrics; } public boolean isIndexColumnsPresent() { diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index a412f9a..100f772 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -210,7 +210,7 @@ public final class DataLoadProcessBuilder { } configuration.setSkipParsers(loadModel.isSkipParsers()); configuration.setTaskNo(loadModel.getTaskNo()); - configuration.setOutputFilesInfoHolder(loadModel.getOutputFilesInfoHolder()); + configuration.setMetrics(loadModel.getMetrics()); String[] complexDelimiters = new String[loadModel.getComplexDelimiters().size()]; loadModel.getComplexDelimiters().toArray(complexDelimiters); configuration diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index 1aba14c..9d8d792 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -28,7 +28,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; -import org.apache.carbondata.core.util.OutputFilesInfoHolder; +import org.apache.carbondata.core.util.DataLoadMetrics; import org.apache.carbondata.core.util.path.CarbonTablePath; public class CarbonLoadModel implements Serializable { @@ -226,7 +226,7 @@ public class CarbonLoadModel implements Serializable { */ private int bucketId; - private OutputFilesInfoHolder outputFilesInfoHolder; + private DataLoadMetrics metrics; private boolean skipParsers = false; @@ -428,7 +428,7 @@ public class CarbonLoadModel implements Serializable { copy.rangePartitionColumn = rangePartitionColumn; copy.scaleFactor = scaleFactor; copy.totalSize = totalSize; - copy.outputFilesInfoHolder = outputFilesInfoHolder; + copy.metrics = metrics; copy.isLoadWithoutConverterWithoutReArrangeStep = isLoadWithoutConverterWithoutReArrangeStep; return copy; } @@ -482,7 +482,7 @@ public class CarbonLoadModel implements Serializable { copyObj.rangePartitionColumn = rangePartitionColumn; copyObj.scaleFactor = scaleFactor; copyObj.totalSize = totalSize; - copyObj.outputFilesInfoHolder = outputFilesInfoHolder; + copyObj.metrics = metrics; copyObj.isLoadWithoutConverterStep = isLoadWithoutConverterStep; copyObj.isLoadWithoutConverterWithoutReArrangeStep = isLoadWithoutConverterWithoutReArrangeStep; return copyObj; @@ -881,12 +881,12 @@ public class CarbonLoadModel implements Serializable { return scaleFactor; } - public OutputFilesInfoHolder getOutputFilesInfoHolder() { - return outputFilesInfoHolder; + public DataLoadMetrics getMetrics() { + return metrics; } - public void setOutputFilesInfoHolder(OutputFilesInfoHolder outputFilesInfoHolder) { - this.outputFilesInfoHolder = outputFilesInfoHolder; + public void setMetrics(DataLoadMetrics metrics) { + this.metrics = metrics; } public boolean isIndexColumnsPresent() { diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java index 59e6345..33eefa5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java @@ -37,6 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataLoadMetrics; import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants; import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; @@ -296,6 +297,8 @@ public class CarbonLoadModelBuilder { validateAndSetBinaryDecoder(carbonLoadModel); validateRangeColumn(optionsFinal, carbonLoadModel); + + carbonLoadModel.setMetrics(new DataLoadMetrics()); } private void validateRangeColumn(Map<String, String> optionsFinal, diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java index fbb6252..ebd3f7d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java @@ -418,6 +418,9 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces carbonFactHandler.closeHandler(); } } + if (configuration.getMetrics() != null) { + configuration.getMetrics().addOutputRows(rowCounter.get()); + } } } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java index 464959c..676c40e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java @@ -258,6 +258,9 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { carbonFactHandler.closeHandler(); } } + if (configuration.getMetrics() != null) { + configuration.getMetrics().addOutputRows(rowCounter.get()); + } } } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 0ad37e8..fd54135 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -40,7 +40,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.OutputFilesInfoHolder; +import org.apache.carbondata.core.util.DataLoadMetrics; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datatypes.GenericDataType; import org.apache.carbondata.processing.index.IndexWriterListener; @@ -160,7 +160,7 @@ public class CarbonFactDataHandlerModel { // this will help in knowing complex byte array will be divided into how may new pages. private int noDictAllComplexColumnDepth; - private OutputFilesInfoHolder outputFilesInfoHolder; + private DataLoadMetrics metrics; /** * Create the model using @{@link CarbonDataLoadConfiguration} @@ -249,8 +249,7 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.indexWriterlistener = listener; carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount(); carbonFactDataHandlerModel.initNumberOfCores(); - carbonFactDataHandlerModel - .setOutputFilesInfoHolder(configuration.getOutputFilesInfoHolder()); + carbonFactDataHandlerModel.setMetrics(configuration.getMetrics()); return carbonFactDataHandlerModel; } @@ -321,7 +320,7 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel .setColumnLocalDictGenMap(CarbonUtil.getLocalDictionaryModel(carbonTable)); carbonFactDataHandlerModel.sortScope = carbonTable.getSortScope(); - carbonFactDataHandlerModel.setOutputFilesInfoHolder(loadModel.getOutputFilesInfoHolder()); + carbonFactDataHandlerModel.setMetrics(loadModel.getMetrics()); return carbonFactDataHandlerModel; } @@ -657,12 +656,12 @@ public class CarbonFactDataHandlerModel { this.noDictAllComplexColumnDepth = noDictAllComplexColumnDepth; } - public OutputFilesInfoHolder getOutputFilesInfoHolder() { - return outputFilesInfoHolder; + public DataLoadMetrics getMetrics() { + return metrics; } - public void setOutputFilesInfoHolder(OutputFilesInfoHolder outputFilesInfoHolder) { - this.outputFilesInfoHolder = outputFilesInfoHolder; + public void setMetrics(DataLoadMetrics metrics) { + this.metrics = metrics; } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index fdc717f..0dd17ee 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -48,7 +48,7 @@ import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.OutputFilesInfoHolder; +import org.apache.carbondata.core.util.DataLoadMetrics; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonIndexFileWriter; import org.apache.carbondata.format.BlockIndex; @@ -150,7 +150,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { protected ExecutorService fallbackExecutorService; - private OutputFilesInfoHolder outputFilesInfoHolder; + private DataLoadMetrics metrics; public AbstractFactDataWriter(CarbonFactDataHandlerModel model) { this.model = model; @@ -202,7 +202,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { "FallbackPool:" + model.getTableName() + ", range: " + model.getBucketId(), true)); } - this.outputFilesInfoHolder = this.model.getOutputFilesInfoHolder(); + this.metrics = this.model.getMetrics(); } /** @@ -270,7 +270,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { } else { if (copyInCurrentThread) { CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath, - model.getCarbonDataDirectoryPath(), fileSizeInBytes, outputFilesInfoHolder); + model.getCarbonDataDirectoryPath(), fileSizeInBytes, metrics); FileFactory.deleteFile(carbonDataFileTempPath); } else { executorServiceSubmitList @@ -438,7 +438,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { if (!enableDirectlyWriteDataToStorePath) { CarbonUtil .copyCarbonDataFileToCarbonStorePath(indexFileName, model.getCarbonDataDirectoryPath(), - fileSizeInBytes, outputFilesInfoHolder); + fileSizeInBytes, metrics); FileFactory.deleteFile(indexFileName); } } @@ -504,7 +504,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { @Override public Void call() throws Exception { CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName, model.getCarbonDataDirectoryPath(), - fileSizeInBytes, outputFilesInfoHolder); + fileSizeInBytes, metrics); FileFactory.deleteFile(fileName); return null; }