[CARBONDATA-2844] [CARBONDATA-2865] Pass SK/AK to executor by serializing hadoop configuration from driver.
add SK/AK to thread local so that on each query new SK/AK can be passed to FileFactory Refactor FileFactory to accept configuration from thread local. Fixed compatibility issue from 1.3.x to 1.5.x [CARBONDATA-2865]. This closes #2623 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2a9604cd Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2a9604cd Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2a9604cd Branch: refs/heads/master Commit: 2a9604cd840dc1a552afcff23059ac9bf624e161 Parents: 1fb1f19 Author: kunal642 <kunalkapoor...@gmail.com> Authored: Wed Aug 8 21:50:44 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Tue Aug 28 14:30:19 2018 +0530 ---------------------------------------------------------------------- .../core/datastore/impl/DFSFileReaderImpl.java | 8 ++- .../datastore/impl/DefaultFileTypeProvider.java | 4 +- .../core/datastore/impl/FileFactory.java | 27 +++++++--- .../core/datastore/impl/FileTypeInterface.java | 2 +- .../scan/executor/QueryExecutorFactory.java | 12 +++-- .../executor/impl/AbstractQueryExecutor.java | 5 +- .../scan/executor/impl/DetailQueryExecutor.java | 6 +++ .../impl/SearchModeDetailQueryExecutor.java | 4 +- .../SearchModeVectorDetailQueryExecutor.java | 5 +- .../impl/VectorDetailQueryExecutor.java | 6 +++ .../carbondata/core/util/CarbonProperties.java | 6 +-- .../apache/carbondata/core/util/CarbonUtil.java | 6 +-- .../carbondata/core/util/SessionParams.java | 5 +- .../core/util/ThreadLocalSessionInfo.java | 20 +++++++ .../store/impl/DFSFileReaderImplUnitTest.java | 3 +- .../datamap/lucene/LuceneDataMapWriter.java | 2 +- .../carbondata/hadoop/CarbonRecordReader.java | 10 ++-- .../hadoop/api/CarbonInputFormat.java | 3 +- .../hadoop/api/CarbonTableInputFormat.java | 1 - .../hadoop/api/CarbonTableOutputFormat.java | 7 ++- .../hadoop/util/CarbonInputFormatUtil.java | 17 ------ .../carbondata/hive/CarbonHiveRecordReader.java | 2 +- .../presto/CarbondataPageSourceProvider.java | 3 +- .../PrestoCarbonVectorizedRecordReader.java | 3 +- .../presto/impl/CarbonTableReader.java | 2 +- .../carbondata/presto/server/PrestoServer.scala | 1 + ...eneFineGrainDataMapWithSearchModeSuite.scala | 3 +- .../createTable/TestCreateTableAsSelect.scala | 10 ++-- .../carbondata/spark/load/CsvRDDHelper.scala | 9 ++-- .../load/DataLoadProcessBuilderOnSpark.scala | 8 ++- .../load/DataLoadProcessorStepOnSpark.scala | 10 ++-- .../spark/rdd/AlterTableAddColumnRDD.scala | 9 ++-- .../spark/rdd/AlterTableDropColumnRDD.scala | 11 ++-- .../spark/rdd/AlterTableLoadPartitionRDD.scala | 7 +-- .../spark/rdd/CarbonDropPartitionRDD.scala | 11 ++-- .../spark/rdd/CarbonGlobalDictionaryRDD.scala | 28 +++++----- .../spark/rdd/CarbonIUDMergerRDD.scala | 15 +++--- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 15 +++--- .../apache/carbondata/spark/rdd/CarbonRDD.scala | 55 +++++++++----------- .../spark/rdd/CarbonScanPartitionRDD.scala | 15 +++--- .../carbondata/spark/rdd/CarbonScanRDD.scala | 15 +++--- .../spark/rdd/NewCarbonDataLoadRDD.scala | 53 ++++++------------- .../carbondata/spark/rdd/SparkDataMapJob.scala | 21 ++++---- .../carbondata/spark/rdd/StreamHandoffRDD.scala | 14 +++-- .../carbondata/spark/util/CommonUtil.scala | 10 ++-- .../spark/util/GlobalDictionaryUtil.scala | 20 ++++--- .../apache/spark/rdd/CarbonMergeFilesRDD.scala | 11 ++-- .../apache/spark/rdd/DataLoadCoalescedRDD.scala | 7 +-- .../command/carbonTableSchemaCommon.scala | 2 +- .../apache/spark/sql/util/SparkSQLUtil.scala | 4 ++ .../org/apache/spark/util/PartitionUtils.scala | 2 +- .../VectorizedCarbonRecordReader.java | 3 +- .../datasources/SparkCarbonFileFormat.scala | 3 +- .../datamap/IndexDataMapRebuildRDD.scala | 16 +++--- .../spark/rdd/CarbonDataRDDFactory.scala | 26 +++++---- .../spark/rdd/CarbonTableCompactor.scala | 4 +- .../org/apache/spark/sql/CarbonCountStar.scala | 6 ++- .../sql/CarbonDatasourceHadoopRelation.scala | 1 - .../spark/sql/CarbonDictionaryDecoder.scala | 15 ++++-- .../scala/org/apache/spark/sql/CarbonEnv.scala | 6 +-- .../org/apache/spark/sql/CarbonSession.scala | 3 +- .../org/apache/spark/sql/CarbonSource.scala | 1 - .../sql/events/MergeIndexEventListener.scala | 14 ++--- .../management/CarbonInsertIntoCommand.scala | 7 +-- .../management/CarbonLoadDataCommand.scala | 19 ++++--- .../command/mutation/DeleteExecution.scala | 16 +++--- .../command/mutation/HorizontalCompaction.scala | 6 +++ ...rbonAlterTableDropHivePartitionCommand.scala | 2 +- .../CarbonAlterTableAddColumnCommand.scala | 4 +- .../CarbonAlterTableDropColumnCommand.scala | 2 +- .../table/CarbonCreateTableCommand.scala | 6 ++- .../sql/execution/strategy/DDLStrategy.scala | 4 +- .../org/apache/spark/util/TableLoader.scala | 3 +- .../merger/CarbonCompactionExecutor.java | 12 +++-- .../spliter/AbstractCarbonQueryExecutor.java | 7 ++- .../partition/spliter/CarbonSplitExecutor.java | 6 ++- .../sdk/file/CarbonReaderBuilder.java | 3 ++ .../sdk/file/CarbonWriterBuilder.java | 6 +++ .../carbondata/sdk/file/JsonCarbonWriter.java | 3 +- .../store/worker/SearchRequestHandler.java | 3 +- 80 files changed, 428 insertions(+), 314 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java index 1a0cd41..e86fa12 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java @@ -25,6 +25,7 @@ import java.util.Map.Entry; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.FileReader; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -37,7 +38,10 @@ public class DFSFileReaderImpl implements FileReader { private boolean readPageByPage; - public DFSFileReaderImpl() { + private Configuration configuration; + + public DFSFileReaderImpl(Configuration configuration) { + this.configuration = configuration; this.fileNameAndStreamCache = new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); } @@ -60,7 +64,7 @@ public class DFSFileReaderImpl implements FileReader { FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath); if (null == fileChannel) { Path pt = new Path(filePath); - FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration()); + FileSystem fs = pt.getFileSystem(configuration); fileChannel = fs.open(pt); fileNameAndStreamCache.put(filePath, fileChannel); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java index c4761c9..937b5b6 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java @@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration; public class DefaultFileTypeProvider implements FileTypeInterface { - public FileReader getFileHolder(FileFactory.FileType fileType) { + public FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration) { switch (fileType) { case LOCAL: return new FileReaderImpl(); @@ -37,7 +37,7 @@ public class DefaultFileTypeProvider implements FileTypeInterface { case ALLUXIO: case VIEWFS: case S3: - return new DFSFileReaderImpl(); + return new DFSFileReaderImpl(configuration); default: return new FileReaderImpl(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index e353623..b07d11b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -30,6 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -59,11 +60,23 @@ public final class FileFactory { } public static Configuration getConfiguration() { - return configuration; + Configuration conf; + Object confObject = ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo() + .getNonSerializableExtraInfo().get("carbonConf"); + if (confObject == null) { + conf = configuration; + } else { + conf = (Configuration) confObject; + } + return conf; } public static FileReader getFileHolder(FileType fileType) { - return fileFileTypeInterface.getFileHolder(fileType); + return fileFileTypeInterface.getFileHolder(fileType, getConfiguration()); + } + + public static FileReader getFileHolder(FileType fileType, Configuration configuration) { + return fileFileTypeInterface.getFileHolder(fileType, configuration); } public static FileType getFileType(String path) { @@ -100,7 +113,7 @@ public final class FileFactory { public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize) throws IOException { - return getDataInputStream(path, fileType, bufferSize, configuration); + return getDataInputStream(path, fileType, bufferSize, getConfiguration()); } public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize, Configuration configuration) throws IOException { @@ -306,7 +319,7 @@ public final class FileFactory { // this method was new in hadoop 2.7, otherwise use CarbonFile.truncate to do this. try { Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(configuration); + FileSystem fs = pt.getFileSystem(getConfiguration()); Method truncateMethod = fs.getClass().getDeclaredMethod("truncate", new Class[]{Path.class, long.class}); truncateMethod.invoke(fs, new Object[]{pt, newSize}); @@ -414,7 +427,7 @@ public final class FileFactory { case VIEWFS: case S3: Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); + FileSystem fs = path.getFileSystem(getConfiguration()); return fs.getContentSummary(path).getLength(); case LOCAL: default: @@ -442,7 +455,7 @@ public final class FileFactory { * @throws IOException */ public static FileSystem getFileSystem(Path path) throws IOException { - return path.getFileSystem(configuration); + return path.getFileSystem(getConfiguration()); } @@ -455,7 +468,7 @@ public final class FileFactory { case VIEWFS: try { Path path = new Path(directoryPath); - FileSystem fs = path.getFileSystem(FileFactory.configuration); + FileSystem fs = path.getFileSystem(getConfiguration()); if (!fs.exists(path)) { fs.mkdirs(path); fs.setPermission(path, permission); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java index 358d2ef..8b0fcc4 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration; public interface FileTypeInterface { - FileReader getFileHolder(FileFactory.FileType fileType); + FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration); CarbonFile getCarbonFile(String path, FileFactory.FileType fileType); CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration configuration); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java index b790f1c..2a9c7f4 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java @@ -23,24 +23,26 @@ import org.apache.carbondata.core.scan.executor.impl.VectorDetailQueryExecutor; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.hadoop.conf.Configuration; + /** * Factory class to get the query executor from RDD * This will return the executor based on query type */ public class QueryExecutorFactory { - public static QueryExecutor getQueryExecutor(QueryModel queryModel) { + public static QueryExecutor getQueryExecutor(QueryModel queryModel, Configuration configuration) { if (CarbonProperties.isSearchModeEnabled()) { if (queryModel.isVectorReader()) { - return new SearchModeVectorDetailQueryExecutor(); + return new SearchModeVectorDetailQueryExecutor(configuration); } else { - return new SearchModeDetailQueryExecutor(); + return new SearchModeDetailQueryExecutor(configuration); } } else { if (queryModel.isVectorReader()) { - return new VectorDetailQueryExecutor(); + return new VectorDetailQueryExecutor(configuration); } else { - return new DetailQueryExecutor(); + return new DetailQueryExecutor(configuration); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index 259889b..ece2f8d 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -68,10 +68,12 @@ import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.carbondata.core.util.ThreadLocalTaskInfo; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.conf.Configuration; /** * This class provides a skeletal implementation of the {@link QueryExecutor} @@ -96,7 +98,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { */ protected CarbonIterator queryIterator; - public AbstractQueryExecutor() { + public AbstractQueryExecutor(Configuration configuration) { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration); queryProperties = new QueryExecutorProperties(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java index 46ef43d..e11c576 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java @@ -27,6 +27,8 @@ import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.result.RowBatch; import org.apache.carbondata.core.scan.result.iterator.DetailQueryResultIterator; +import org.apache.hadoop.conf.Configuration; + /** * Below class will be used to execute the detail query * For executing the detail query it will pass all the block execution @@ -34,6 +36,10 @@ import org.apache.carbondata.core.scan.result.iterator.DetailQueryResultIterator */ public class DetailQueryExecutor extends AbstractQueryExecutor<RowBatch> { + public DetailQueryExecutor(Configuration configuration) { + super(configuration); + } + @Override public CarbonIterator<RowBatch> execute(QueryModel queryModel) throws QueryExecutionException, IOException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java index ae14327..6d03540 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java @@ -31,13 +31,15 @@ import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.hadoop.conf.Configuration; public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object> { private static final LogService LOGGER = LogServiceFactory.getLogService(SearchModeDetailQueryExecutor.class.getName()); private static ExecutorService executorService = null; - public SearchModeDetailQueryExecutor() { + public SearchModeDetailQueryExecutor(Configuration configuration) { + super(configuration); if (executorService == null) { initThreadPool(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java index 705c451..418ef42 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java @@ -32,6 +32,8 @@ import org.apache.carbondata.core.util.CarbonProperties; import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD; +import org.apache.hadoop.conf.Configuration; + /** * Below class will be used to execute the detail query and returns columnar vectors. */ @@ -40,7 +42,8 @@ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<O LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName()); private static ExecutorService executorService = null; - public SearchModeVectorDetailQueryExecutor() { + public SearchModeVectorDetailQueryExecutor(Configuration configuration) { + super(configuration); if (executorService == null) { initThreadPool(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java index 7787e4c..46397c9 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java @@ -26,11 +26,17 @@ import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.result.iterator.VectorDetailQueryResultIterator; +import org.apache.hadoop.conf.Configuration; + /** * Below class will be used to execute the detail query and returns columnar vectors. */ public class VectorDetailQueryExecutor extends AbstractQueryExecutor<Object> { + public VectorDetailQueryExecutor(Configuration configuration) { + super(configuration); + } + @Override public CarbonIterator<Object> execute(QueryModel queryModel) throws QueryExecutionException, IOException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index c3a4934..58fef17 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -23,11 +23,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Field; import java.text.SimpleDateFormat; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -94,7 +94,7 @@ public final class CarbonProperties { /** * It is purely for testing */ - private Map<String, String> addedProperty = new HashMap<>(); + private Map<String, String> addedProperty = new ConcurrentHashMap<>(); /** * Private constructor this will call load properties method to load all the @@ -407,7 +407,7 @@ public final class CarbonProperties { * @param lockTypeConfigured */ private void validateAndConfigureLockType(String lockTypeConfigured) { - Configuration configuration = new Configuration(true); + Configuration configuration = FileFactory.getConfiguration(); String defaultFs = configuration.get("fs.defaultFS"); if (null != defaultFs && (defaultFs.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) || defaultFs.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || defaultFs http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 9aaa58c..c5e2e8d 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -134,8 +134,6 @@ public final class CarbonUtil { */ private static final int CONST_HUNDRED = 100; - private static final Configuration conf = new Configuration(true); - /** * dfs.bytes-per-checksum * HDFS checksum length, block size for a file should be exactly divisible @@ -662,7 +660,7 @@ public final class CarbonUtil { */ public static String checkAndAppendHDFSUrl(String filePath) { String currentPath = filePath; - String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS); + String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS); String baseDFSUrl = CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL, ""); if (checkIfPrefixExists(filePath)) { @@ -699,7 +697,7 @@ public final class CarbonUtil { filePath = "/" + filePath; } currentPath = filePath; - String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS); + String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS); if (defaultFsUrl == null) { return currentPath; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java index 169c003..51b157f 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java +++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java @@ -20,6 +20,7 @@ package org.apache.carbondata.core.util; import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.carbondata.common.constants.LoggerAction; import org.apache.carbondata.common.logging.LogService; @@ -57,12 +58,12 @@ public class SessionParams implements Serializable, Cloneable { private static final long serialVersionUID = -7801994600594915264L; private Map<String, String> sProps; - private Map<String, String> addedProps; + private ConcurrentHashMap<String, String> addedProps; // below field to be used when we want the objects to be serialized private Map<String, Object> extraInfo; public SessionParams() { sProps = new HashMap<>(); - addedProps = new HashMap<>(); + addedProps = new ConcurrentHashMap<>(); extraInfo = new HashMap<>(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java index df525bc..f85a350 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.util; +import org.apache.hadoop.conf.Configuration; + /** * This class maintains ThreadLocal session params */ @@ -31,4 +33,22 @@ public class ThreadLocalSessionInfo { public static CarbonSessionInfo getCarbonSessionInfo() { return threadLocal.get(); } + + public static synchronized CarbonSessionInfo getOrCreateCarbonSessionInfo() { + CarbonSessionInfo info = threadLocal.get(); + if (info == null || info.getSessionParams() == null) { + info = new CarbonSessionInfo(); + info.setSessionParams(new SessionParams()); + threadLocal.set(info); + } + return info; + } + + public static void setConfigurationToCurrentThread(Configuration configuration) { + getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo().put("carbonConf", configuration); + } + + public static void unsetAll() { + threadLocal.remove(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java index 30144c1..5033713 100644 --- a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java +++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java @@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.impl.DFSFileReaderImpl; import mockit.Mock; import mockit.MockUp; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,7 +46,7 @@ public class DFSFileReaderImplUnitTest { private static File fileWithEmptyContent; @BeforeClass public static void setup() { - dfsFileHolder = new DFSFileReaderImpl(); + dfsFileHolder = new DFSFileReaderImpl(new Configuration()); file = new File("Test.carbondata"); fileWithEmptyContent = new File("TestEXception.carbondata"); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java index 605ec89..bdb17ed 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java @@ -165,7 +165,7 @@ public class LuceneDataMapWriter extends DataMapWriter { // the indexWriter closes the FileSystem on closing the writer, so for a new configuration // and disable the cache for the index writer, it will be closed on closing the writer - Configuration conf = new Configuration(); + Configuration conf = FileFactory.getConfiguration(); conf.set("fs.hdfs.impl.disable.cache", "true"); // create a index writer http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java index a54e7a4..0d38906 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java @@ -33,6 +33,7 @@ import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -57,15 +58,16 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> { private boolean skipClearDataMapAtClose = false; public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport, - InputMetricsStats inputMetricsStats) { - this(queryModel, readSupport); + InputMetricsStats inputMetricsStats, Configuration configuration) { + this(queryModel, readSupport, configuration); this.inputMetricsStats = inputMetricsStats; } - public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport) { + public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport, + Configuration configuration) { this.queryModel = queryModel; this.readSupport = readSupport; - this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel, configuration); } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 21ef6cf..3ebd6d6 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -583,7 +583,8 @@ m filterExpression Configuration configuration = taskAttemptContext.getConfiguration(); QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext); CarbonReadSupport<T> readSupport = getReadSupportClass(configuration); - return new CarbonRecordReader<T>(queryModel, readSupport); + return new CarbonRecordReader<T>(queryModel, readSupport, + taskAttemptContext.getConfiguration()); } public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index af2cf83..ec201b9 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -132,7 +132,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { @Override public List<InputSplit> getSplits(JobContext job) throws IOException { AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); - CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); if (null == carbonTable) { throw new IOException("Missing/Corrupt schema file for table."); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 5938c20..5cc275b 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -33,6 +33,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.ObjectSerializationUtil; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; import org.apache.carbondata.processing.loading.DataLoadExecutor; import org.apache.carbondata.processing.loading.TableProcessingOperations; @@ -231,7 +232,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje @Override public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter( - TaskAttemptContext taskAttemptContext) throws IOException { + final TaskAttemptContext taskAttemptContext) throws IOException { final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration()); //if loadModel having taskNo already(like in SDK) then no need to overwrite short sdkUserCore = loadModel.getSdkUserCores(); @@ -249,10 +250,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext); final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor(); final ExecutorService executorService = Executors.newFixedThreadPool(1, - new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));; + new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName())); // It should be started in new thread as the underlying iterator uses blocking queue. Future future = executorService.submit(new Thread() { @Override public void run() { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(taskAttemptContext + .getConfiguration()); try { dataLoadExecutor .execute(loadModel, tempStoreLocations, iterators); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index af7397b..7641427 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -28,7 +28,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal; import org.apache.carbondata.core.datamap.DataMapJob; import org.apache.carbondata.core.datamap.DataMapUtil; -import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.exception.InvalidConfigurationException; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -177,20 +176,4 @@ public class CarbonInputFormatUtil { return new JobID(jobtrackerID, batch); } - public static void setS3Configurations(Configuration hadoopConf) { - FileFactory.getConfiguration() - .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", "")); - FileFactory.getConfiguration() - .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", "")); - FileFactory.getConfiguration() - .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", "")); - FileFactory.getConfiguration().set(CarbonCommonConstants.S3_ACCESS_KEY, - hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, "")); - FileFactory.getConfiguration().set(CarbonCommonConstants.S3_SECRET_KEY, - hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, "")); - FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_ACCESS_KEY, - hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, "")); - FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_SECRET_KEY, - hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, "")); - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java index 57bcca3..4ed2b91 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java @@ -63,7 +63,7 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable> public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport<ArrayWritable> readSupport, InputSplit inputSplit, JobConf jobConf) throws IOException { - super(queryModel, readSupport); + super(queryModel, readSupport, jobConf); initialize(inputSplit, jobConf); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java index 5b15b22..3ec815d 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java @@ -100,7 +100,8 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider checkArgument(carbondataSplit.getConnectorId().equals(connectorId), "split is not for this connector"); QueryModel queryModel = createQueryModel(carbondataSplit, columns); - QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + QueryExecutor queryExecutor = + QueryExecutorFactory.getQueryExecutor(queryModel, new Configuration()); try { CarbonIterator iterator = queryExecutor.execute(queryModel); readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java index 32e163a..9935b54 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java @@ -114,7 +114,8 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> { queryModel.setTableBlockInfos(tableBlockInfoList); queryModel.setVectorReader(true); try { - queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + queryExecutor = + QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration()); iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel); } catch (QueryExecutionException e) { throw new InterruptedException(e.getMessage()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java index 7916932..5a1e140 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java @@ -414,7 +414,7 @@ public class CarbonTableReader { List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>(); CarbonTable carbonTable = tableCacheModel.carbonTable; TableInfo tableInfo = tableCacheModel.carbonTable.getTableInfo(); - Configuration config = new Configuration(); + Configuration config = FileFactory.getConfiguration(); config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, ""); String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath(); config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala ---------------------------------------------------------------------- diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala index 127c4c9..2f3b8f4 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala @@ -32,6 +32,7 @@ import com.facebook.presto.tests.DistributedQueryRunner import com.google.common.collect.ImmutableMap import org.slf4j.{Logger, LoggerFactory} +import org.apache.carbondata.core.util.ThreadLocalSessionInfo import org.apache.carbondata.presto.CarbondataPlugin object PrestoServer { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala index 0ac6e72..6cbe747 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala @@ -41,8 +41,7 @@ class LuceneFineGrainDataMapWithSearchModeSuite extends QueryTest with BeforeAnd val n = 500000 sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode() CarbonProperties - .getInstance() - .addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s") + .getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s") LuceneFineGrainDataMapSuite.createFile(file2, n) sql("create database if not exists lucene") sql("use lucene") http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala index 062e5ba..c95e5a4 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala @@ -146,13 +146,13 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll { } test("test create table as select with TBLPROPERTIES") { - sql("DROP TABLE IF EXISTS ctas_tblproperties_test") + sql("DROP TABLE IF EXISTS ctas_tblproperties_testt") sql( - "create table ctas_tblproperties_test stored by 'carbondata' TBLPROPERTIES" + + "create table ctas_tblproperties_testt stored by 'carbondata' TBLPROPERTIES" + "('DICTIONARY_INCLUDE'='key', 'sort_scope'='global_sort') as select * from carbon_ctas_test") - checkAnswer(sql("select * from ctas_tblproperties_test"), sql("select * from carbon_ctas_test")) + checkAnswer(sql("select * from ctas_tblproperties_testt"), sql("select * from carbon_ctas_test")) val carbonTable = CarbonEnv.getInstance(Spark2TestQueryExecutor.spark).carbonMetastore - .lookupRelation(Option("default"), "ctas_tblproperties_test")(Spark2TestQueryExecutor.spark) + .lookupRelation(Option("default"), "ctas_tblproperties_testt")(Spark2TestQueryExecutor.spark) .asInstanceOf[CarbonRelation].carbonTable val metadataFolderPath: CarbonFile = FileFactory.getCarbonFile(carbonTable.getMetadataPath) assert(metadataFolderPath.exists()) @@ -419,7 +419,7 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS ctas_select_where_carbon") sql("DROP TABLE IF EXISTS ctas_select_where_parquet") sql("DROP TABLE IF EXISTS ctas_select_where_orc") - sql("DROP TABLE IF EXISTS ctas_tblproperties_test") + sql("DROP TABLE IF EXISTS ctas_tblproperties_testt") sql("DROP TABLE IF EXISTS ctas_if_table_name") sql("DROP TABLE IF EXISTS source_table") sql("DROP TABLE IF EXISTS target_table") http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala index 36d8c51..8d6dd32 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala @@ -37,6 +37,8 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P import org.apache.spark.sql.util.SparkSQLUtil.sessionState import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.ThreadLocalSessionInfo import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.spark.rdd.SerializableConfiguration @@ -108,17 +110,18 @@ object CsvRDDHelper { closePartition() // 2. read function - val serializableConfiguration = new SerializableConfiguration(jobConf) + val serializableConfiguration = new SerializableConfiguration(hadoopConf) val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable { override def apply(file: PartitionedFile): Iterator[InternalRow] = { new Iterator[InternalRow] { - val hadoopConf = serializableConfiguration.value val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) formatter.format(new Date()) } + ThreadLocalSessionInfo.setConfigurationToCurrentThread(serializableConfiguration.value) val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0) - val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) + val hadoopAttemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, + attemptId) val inputSplit = new FileSplit(new Path(file.filePath), file.start, file.length, file.locations) var finished = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index dc238fb..be40b13 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -29,11 +29,13 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses} import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} import org.apache.carbondata.processing.util.CarbonDataProcessorUtil +import org.apache.carbondata.spark.rdd.SerializableConfiguration /** * Use sortBy operator in spark to load the data @@ -64,6 +66,7 @@ object DataLoadProcessBuilderOnSpark { val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) // 1. Input val inputRDD = originRDD .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)) @@ -73,6 +76,7 @@ object DataLoadProcessBuilderOnSpark { // 2. Convert val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) }.filter(_ != null)// Filter the bad record @@ -116,7 +120,7 @@ object DataLoadProcessBuilderOnSpark { // 4. Write sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast, - writeStepRowCounter)) + writeStepRowCounter, conf)) // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will // not have any functional impact as spark automatically monitors the cache usage on each node http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 73ed769..f17bd91 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -20,6 +20,7 @@ package org.apache.carbondata.spark.load import scala.util.Random import com.univocity.parsers.common.TextParsingException +import org.apache.hadoop.conf.Configuration import org.apache.spark.{Accumulator, SparkEnv, TaskContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.Row @@ -29,7 +30,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException import org.apache.carbondata.core.datastore.row.CarbonRow -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations} import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException @@ -40,7 +42,7 @@ import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImp import org.apache.carbondata.processing.sort.sortdata.SortParameters import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory} import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil} -import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow} +import org.apache.carbondata.spark.rdd.{NewRddIterator, SerializableConfiguration, StringArrayRow} import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util} object DataLoadProcessorStepOnSpark { @@ -227,7 +229,9 @@ object DataLoadProcessorStepOnSpark { rows: Iterator[CarbonRow], index: Int, modelBroadcast: Broadcast[CarbonLoadModel], - rowCounter: Accumulator[Int]) { + rowCounter: Accumulator[Int], + conf: Broadcast[SerializableConfiguration]) { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) var model: CarbonLoadModel = null var tableName: String = null var rowConverter: RowConverterImpl = null http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala index 7c1edea..f7aa623 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala @@ -17,7 +17,8 @@ package org.apache.carbondata.spark.rdd -import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.SparkSession import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory @@ -47,15 +48,15 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par /** * This class is aimed at generating dictionary file for the newly added columns */ -class AlterTableAddColumnRDD[K, V](sc: SparkContext, +class AlterTableAddColumnRDD[K, V](@transient sparkSession: SparkSession, @transient newColumns: Seq[ColumnSchema], identifier: AbsoluteTableIdentifier) - extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) { + extends CarbonRDD[(Int, SegmentStatus)](sparkSession, Nil) { val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS) - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { newColumns.zipWithIndex.map { column => new AddColumnPartition(id, column._2, column._1) }.toArray http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala index 0dbb4f0..a0d06b8 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala @@ -17,11 +17,12 @@ package org.apache.carbondata.spark.rdd -import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.statusmanager.SegmentStatus @@ -44,12 +45,12 @@ class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Pa /** * This class is aimed at generating dictionary file for the newly added columns */ -class AlterTableDropColumnRDD[K, V](sc: SparkContext, +class AlterTableDropColumnRDD[K, V](@transient ss: SparkSession, @transient newColumns: Seq[ColumnSchema], carbonTableIdentifier: AbsoluteTableIdentifier) - extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) { + extends CarbonRDD[(Int, SegmentStatus)](ss, Nil) { - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { newColumns.zipWithIndex.map { column => new DropColumnPartition(id, column._2, column._1) }.toArray http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala index 85a6f41..86a5043 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala @@ -39,7 +39,8 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, partitionIds: Seq[String], bucketId: Int, identifier: AbsoluteTableIdentifier, - prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) { + prev: RDD[Array[AnyRef]]) + extends CarbonRDD[(K, V)](alterPartitionModel.sqlContext.sparkSession, prev) { var storeLocation: String = null val carbonLoadModel = alterPartitionModel.carbonLoadModel @@ -50,14 +51,14 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, val factTableName = carbonTable.getTableName val partitionInfo = carbonTable.getPartitionInfo(factTableName) - override protected def getPartitions: Array[Partition] = { + override protected def internalGetPartitions: Array[Partition] = { val sc = alterPartitionModel.sqlContext.sparkContext sc.setLocalProperty("spark.scheduler.pool", "DDL") sc.setLocalProperty("spark.job.interruptOnCancel", "true") firstParent[Array[AnyRef]].partitions } - override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = { + override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava val iter = new Iterator[(K, V)] { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala index d56e1c2..e2d1eff 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala @@ -21,7 +21,8 @@ import java.util import scala.collection.JavaConverters._ -import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.SparkSession import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.indexstore.PartitionSpec @@ -37,19 +38,19 @@ case class CarbonDropPartition(rddId: Int, val idx: Int, segment: Segment) /** * RDD to drop the partitions from segment files of all segments. - * @param sc + * @param ss * @param tablePath * @param segments segments to be cleaned */ class CarbonDropPartitionRDD( - sc: SparkContext, + @transient ss: SparkSession, tablePath: String, segments: Seq[Segment], partitions: util.List[PartitionSpec], uniqueId: String) - extends CarbonRDD[(String, String)](sc, Nil, sc.hadoopConfiguration) { + extends CarbonRDD[(String, String)](ss, Nil) { - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { segments.zipWithIndex.map {s => CarbonDropPartition(id, s._2, s._1) }.toArray http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala index 2ec8b9c..9265c7f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@ -31,7 +31,7 @@ import com.univocity.parsers.common.TextParsingException import org.apache.commons.lang3.{ArrayUtils, StringUtils} import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Row, SparkSession} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} @@ -174,11 +174,12 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S * @param model a model package load info */ class CarbonAllDictionaryCombineRDD( + @transient sparkSession: SparkSession, prev: RDD[(String, Iterable[String])], model: DictionaryLoadModel) - extends CarbonRDD[(Int, ColumnDistinctValues)](prev) { + extends CarbonRDD[(Int, ColumnDistinctValues)](sparkSession, prev) { - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { firstParent[(String, Iterable[String])].partitions } @@ -267,11 +268,12 @@ class StringArrayRow(var values: Array[String]) extends Row { * @param model a model package load info */ class CarbonBlockDistinctValuesCombineRDD( + @transient ss: SparkSession, prev: RDD[Row], model: DictionaryLoadModel) - extends CarbonRDD[(Int, ColumnDistinctValues)](prev) { + extends CarbonRDD[(Int, ColumnDistinctValues)](ss, prev) { - override def getPartitions: Array[Partition] = firstParent[Row].partitions + override def internalGetPartitions: Array[Partition] = firstParent[Row].partitions override def internalCompute(split: Partition, context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -325,11 +327,14 @@ class CarbonBlockDistinctValuesCombineRDD( * @param model a model package load info */ class CarbonGlobalDictionaryGenerateRDD( + @transient sparkSession: SparkSession, prev: RDD[(Int, ColumnDistinctValues)], model: DictionaryLoadModel) - extends CarbonRDD[(Int, SegmentStatus)](prev) { + extends CarbonRDD[(Int, SegmentStatus)](sparkSession, prev) { + + override def internalGetPartitions: Array[Partition] = + firstParent[(Int, ColumnDistinctValues)].partitions - override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions override def internalCompute(split: Partition, context: TaskContext): Iterator[(Int, SegmentStatus)] = { @@ -492,21 +497,20 @@ class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension) * Use external column dict to generate global dictionary * * @param carbonLoadModel carbon load model - * @param sparkContext spark context + * @param sparkSession spark context * @param table carbon table identifier * @param dimensions carbon dimenisons having predefined dict * @param dictFolderPath path of dictionary folder */ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel, dictionaryLoadModel: DictionaryLoadModel, - sparkContext: SparkContext, + @transient ss: SparkSession, table: CarbonTableIdentifier, dimensions: Array[CarbonDimension], dictFolderPath: String) - extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil, - sparkContext.hadoopConfiguration) { + extends CarbonRDD[(Int, ColumnDistinctValues)](ss, Nil) { - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { val primDimensions = dictionaryLoadModel.primDimensions val primDimLength = primDimensions.length val result = new Array[Partition](primDimLength) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala index 3aaf0ae..762b920 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala @@ -24,14 +24,15 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job -import org.apache.spark.{Partition, SparkContext} +import org.apache.spark.Partition import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.command.CarbonMergerMapping -import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} -import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} +import org.apache.carbondata.hadoop.api.CarbonInputFormat import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.CarbonDataMergerUtil @@ -41,23 +42,23 @@ import org.apache.carbondata.spark.MergeResult * IUD carbon merger RDD * */ class CarbonIUDMergerRDD[K, V]( - sc: SparkContext, + @transient ss: SparkSession, result: MergeResult[K, V], carbonLoadModel: CarbonLoadModel, carbonMergerMapping: CarbonMergerMapping, confExecutorsTemp: String) - extends CarbonMergerRDD[K, V](sc, + extends CarbonMergerRDD[K, V](ss, result, carbonLoadModel, carbonMergerMapping, confExecutorsTemp) { - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { val startTime = System.currentTimeMillis() val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from( tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId) ) - val jobConf: JobConf = new JobConf(new Configuration) + val jobConf: JobConf = new JobConf(FileFactory.getConfiguration) SparkHadoopUtil.get.addCredentials(jobConf) val job: Job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index f9f65a7..a0425b7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -41,6 +41,7 @@ import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.block._ +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.blocklet.DataFileFooter @@ -60,15 +61,15 @@ import org.apache.carbondata.spark.MergeResult import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util} class CarbonMergerRDD[K, V]( - sc: SparkContext, + @transient ss: SparkSession, result: MergeResult[K, V], carbonLoadModel: CarbonLoadModel, carbonMergerMapping: CarbonMergerMapping, confExecutorsTemp: String) - extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) { + extends CarbonRDD[(K, V)](ss, Nil) { - sc.setLocalProperty("spark.scheduler.pool", "DDL") - sc.setLocalProperty("spark.job.interruptOnCancel", "true") + ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL") + ss.sparkContext.setLocalProperty("spark.job.interruptOnCancel", "true") private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") var storeLocation: String = null @@ -183,7 +184,7 @@ class CarbonMergerRDD[K, V]( } try { // fire a query and get the results. - rawResultIteratorList = exec.processTableBlocks() + rawResultIteratorList = exec.processTableBlocks(FileFactory.getConfiguration) } catch { case e: Throwable => LOGGER.error(e) @@ -269,7 +270,7 @@ class CarbonMergerRDD[K, V]( iter } - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { val startTime = System.currentTimeMillis() val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from( tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId) @@ -277,7 +278,7 @@ class CarbonMergerRDD[K, V]( val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager( carbonTable) - val jobConf: JobConf = new JobConf(new Configuration) + val jobConf: JobConf = new JobConf(getConf) SparkHadoopUtil.get.addCredentials(jobConf) val job: Job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala index 54a7530..04f20b1 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala @@ -17,28 +17,24 @@ package org.apache.carbondata.spark.rdd -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} - import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration -import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext} +import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.util.SparkSQLUtil -import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.util._ -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil /** * This RDD maintains session level ThreadLocal */ -abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, - @transient private var deps: Seq[Dependency[_]], - @transient hadoopConf: Configuration) extends RDD[T](sc, deps) { +abstract class CarbonRDD[T: ClassTag](@transient ss: SparkSession, + @transient private var deps: Seq[Dependency[_]]) extends RDD[T](ss.sparkContext, deps) { val carbonSessionInfo: CarbonSessionInfo = { var info = ThreadLocalSessionInfo.getCarbonSessionInfo @@ -50,24 +46,27 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, info } - private val confBytes = { - val bao = new ByteArrayOutputStream() - val oos = new ObjectOutputStream(bao) - hadoopConf.write(oos) - oos.close() - CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray) - } + val config: Broadcast[SerializableConfiguration] = sparkContext + .broadcast(new SerializableConfiguration(SparkSQLUtil.sessionState(ss).newHadoopConf())) /** Construct an RDD with just a one-to-one dependency on one parent */ - def this(@transient oneParent: RDD[_]) = - this (oneParent.context, List(new OneToOneDependency(oneParent)), - oneParent.sparkContext.hadoopConfiguration) + def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_]) = + this (sparkSession, List(new OneToOneDependency(oneParent))) + + protected def internalGetPartitions: Array[Partition] + + override def getPartitions: Array[Partition] = { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(config.value.value) + internalGetPartitions + } // RDD compute logic should be here def internalCompute(split: Partition, context: TaskContext): Iterator[T] final def compute(split: Partition, context: TaskContext): Iterator[T] = { - CarbonInputFormatUtil.setS3Configurations(getConf) + TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll()) + carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", config + .value.value) ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) TaskMetricsMap.threadLocal.set(Thread.currentThread().getId) val carbonTaskInfo = new CarbonTaskInfo @@ -79,13 +78,7 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, } def getConf: Configuration = { - val configuration = new Configuration(false) - val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor - .unCompressByte(confBytes)) - val ois = new ObjectInputStream(bai) - configuration.readFields(ois) - ois.close() - configuration + config.value.value } } @@ -93,12 +86,14 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, * This RDD contains TableInfo object which is serialized and deserialized in driver and executor */ abstract class CarbonRDDWithTableInfo[T: ClassTag]( - @transient sc: SparkContext, + @transient ss: SparkSession, @transient private var deps: Seq[Dependency[_]], - serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps, sc.hadoopConfiguration) { + serializedTableInfo: Array[Byte]) extends CarbonRDD[T](ss, deps) { - def this(@transient oneParent: RDD[_], serializedTableInfo: Array[Byte]) = - this (oneParent.context, List(new OneToOneDependency(oneParent)), serializedTableInfo) + def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_], + serializedTableInfo: Array[Byte]) = { + this (sparkSession, List(new OneToOneDependency(oneParent)), serializedTableInfo) + } def getTableInfo: TableInfo = TableInfo.deserialize(serializedTableInfo) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala index 9452777..241720a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.command.AlterPartitionModel import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.util.CarbonException @@ -36,6 +37,7 @@ import org.apache.spark.util.PartitionUtils import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo} +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.DataTypes @@ -65,7 +67,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel, absoluteTableIdentifier: AbsoluteTableIdentifier, partitionIds: Seq[String], bucketId: Int) - extends RDD[(AnyRef, Array[AnyRef])](alterPartitionModel.sqlContext.sparkContext, Nil) { + extends CarbonRDD[(AnyRef, Array[AnyRef])](alterPartitionModel.sqlContext.sparkSession, Nil) { private val queryId = alterPartitionModel.sqlContext.sparkContext.getConf .get("queryId", System.nanoTime() + "") @@ -91,9 +93,9 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel, val dictionaryIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]() val measureIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]() - override def getPartitions: Array[Partition] = { + override def internalGetPartitions: Array[Partition] = { val parallelism = sparkContext.defaultParallelism - val jobConf = new JobConf(new Configuration) + val jobConf = new JobConf(FileFactory.getConfiguration) val job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonTableInputFormat(absoluteTableIdentifier, partitionIds.toList.asJava, job) @@ -127,8 +129,8 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel, result.toArray(new Array[Partition](result.size())) } - override def compute(split: Partition, context: TaskContext): - Iterator[(AnyRef, Array[AnyRef])] = { + override def internalCompute(split: Partition, context: TaskContext): + Iterator[(AnyRef, Array[AnyRef])] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) var exec : CarbonSplitExecutor = null val rows : java.util.List[(AnyRef, Array[AnyRef])] = new ArrayList[(AnyRef, Array[AnyRef])]() @@ -142,7 +144,8 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel, var result : java.util.List[PartitionSpliterRawResultIterator] = null try { exec = new CarbonSplitExecutor(segmentMapping, carbonTable) - result = exec.processDataBlocks(segmentId, new SparkDataTypeConverterImpl()) + result = exec.processDataBlocks(segmentId, new SparkDataTypeConverterImpl(), + FileFactory.getConfiguration) } catch { case e: Throwable => LOGGER.error(e)