This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new fcca6c5 [CARBONDATA-3364] Support Read from Hive. Queries are giving empty results from hive. fcca6c5 is described below commit fcca6c5b661ec02adfa17622e980a0c396bd84c2 Author: dhatchayani <dhatcha.offic...@gmail.com> AuthorDate: Mon Apr 29 18:52:57 2019 +0530 [CARBONDATA-3364] Support Read from Hive. Queries are giving empty results from hive. This closes #3192 --- .../apache/carbondata/examples/HiveExample.scala | 99 +++++++++++++--------- .../apache/carbondata/examplesCI/RunExamples.scala | 3 +- integration/hive/pom.xml | 9 +- .../carbondata/hive/CarbonHiveInputSplit.java | 8 +- .../apache/carbondata/hive/CarbonHiveSerDe.java | 2 +- .../carbondata/hive/MapredCarbonInputFormat.java | 20 ++--- .../carbondata/hive/MapredCarbonOutputFormat.java | 12 ++- .../{ => test}/server/HiveEmbeddedServer2.java | 20 ++--- integration/spark-common-test/pom.xml | 6 ++ .../TestCreateHiveTableWithCarbonDS.scala | 4 +- integration/spark-common/pom.xml | 5 ++ .../apache/spark/util/CarbonReflectionUtils.scala | 17 ++-- .../spark/util/DictionaryLRUCacheTestCase.scala | 1 + pom.xml | 1 + 14 files changed, 123 insertions(+), 84 deletions(-) diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala index b50e763..c043076 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala @@ -19,33 +19,36 @@ package org.apache.carbondata.examples import java.io.File import java.sql.{DriverManager, ResultSet, Statement} -import org.apache.spark.sql.SparkSession +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.permission.{FsAction, FsPermission} import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.examples.util.ExampleUtils -import org.apache.carbondata.hive.server.HiveEmbeddedServer2 +import org.apache.carbondata.hive.test.server.HiveEmbeddedServer2 // scalastyle:off println object HiveExample { private val driverName: String = "org.apache.hive.jdbc.HiveDriver" - def main(args: Array[String]) { - val carbonSession = ExampleUtils.createCarbonSession("HiveExample") - exampleBody(carbonSession, CarbonProperties.getStorePath - + CarbonCommonConstants.FILE_SEPARATOR - + CarbonCommonConstants.DATABASE_DEFAULT_NAME) - carbonSession.stop() + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + private val targetLoc = s"$rootPath/examples/spark2/target" + val metaStoreLoc = s"$targetLoc/metastore_db" + val storeLocation = s"$targetLoc/store" + val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def main(args: Array[String]) { + createCarbonTable(storeLocation) + readFromHive System.exit(0) } - def exampleBody(carbonSession: SparkSession, store: String): Unit = { - val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath + def createCarbonTable(store: String): Unit = { + + val carbonSession = ExampleUtils.createCarbonSession("HiveExample") carbonSession.sql("""DROP TABLE IF EXISTS HIVE_CARBON_EXAMPLE""".stripMargin) @@ -56,14 +59,44 @@ object HiveExample { | STORED BY 'carbondata' """.stripMargin) + val inputPath = FileFactory + .getUpdatedFilePath(s"$rootPath/examples/spark2/src/main/resources/sample.csv") + carbonSession.sql( s""" - | LOAD DATA LOCAL INPATH '$rootPath/examples/spark2/src/main/resources/sample.csv' + | LOAD DATA LOCAL INPATH '$inputPath' + | INTO TABLE HIVE_CARBON_EXAMPLE + """.stripMargin) + + carbonSession.sql( + s""" + | LOAD DATA LOCAL INPATH '$inputPath' | INTO TABLE HIVE_CARBON_EXAMPLE """.stripMargin) carbonSession.sql("SELECT * FROM HIVE_CARBON_EXAMPLE").show() + carbonSession.close() + + // delete the already existing lock on metastore so that new derby instance + // for HiveServer can run on the same metastore + checkAndDeleteDBLock + + } + + def checkAndDeleteDBLock: Unit = { + val dbLockPath = FileFactory.getUpdatedFilePath(s"$metaStoreLoc/db.lck") + val dbexLockPath = FileFactory.getUpdatedFilePath(s"$metaStoreLoc/dbex.lck") + if(FileFactory.isFileExist(dbLockPath)) { + FileFactory.deleteFile(dbLockPath, FileFactory.getFileType(dbLockPath)) + } + if(FileFactory.isFileExist(dbexLockPath)) { + FileFactory.deleteFile(dbexLockPath, FileFactory.getFileType(dbexLockPath)) + } + } + + + def readFromHive: Unit = { try { Class.forName(driverName) } @@ -72,37 +105,19 @@ object HiveExample { classNotFoundException.printStackTrace() } + // make HDFS writable + val path = new Path(targetLoc) + val fileSys = path.getFileSystem(FileFactory.getConfiguration) + fileSys.setPermission(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) + val hiveEmbeddedServer2 = new HiveEmbeddedServer2() - hiveEmbeddedServer2.start() + hiveEmbeddedServer2.start(targetLoc) val port = hiveEmbeddedServer2.getFreePort val connection = DriverManager.getConnection(s"jdbc:hive2://localhost:$port/default", "", "") val statement: Statement = connection.createStatement logger.info(s"============HIVE CLI IS STARTED ON PORT $port ==============") - statement.execute( - s""" - | CREATE TABLE IF NOT EXISTS HIVE_CARBON_EXAMPLE - | (ID int, NAME string,SALARY double) - | ROW FORMAT SERDE 'org.apache.carbondata.hive.CarbonHiveSerDe' - | WITH SERDEPROPERTIES ('mapreduce.input.carboninputformat.databaseName'='default', - | 'mapreduce.input.carboninputformat.tableName'='HIVE_CARBON_EXAMPLE') - """.stripMargin) - - statement.execute( - s""" - | ALTER TABLE HIVE_CARBON_EXAMPLE - | SET FILEFORMAT - | INPUTFORMAT \"org.apache.carbondata.hive.MapredCarbonInputFormat\" - | OUTPUTFORMAT \"org.apache.carbondata.hive.MapredCarbonOutputFormat\" - | SERDE \"org.apache.carbondata.hive.CarbonHiveSerDe\" - """.stripMargin) - - statement - .execute( - "ALTER TABLE HIVE_CARBON_EXAMPLE SET LOCATION " + - s"'file:///$store/hive_carbon_example' ") - val resultSet: ResultSet = statement.executeQuery("SELECT * FROM HIVE_CARBON_EXAMPLE") var rowsFetched = 0 @@ -135,7 +150,7 @@ object HiveExample { rowsFetched = rowsFetched + 1 } println(s"******Total Number Of Rows Fetched ****** $rowsFetched") - assert(rowsFetched == 2) + assert(rowsFetched == 4) logger.info("Fetching the Individual Columns ") @@ -166,7 +181,7 @@ object HiveExample { } println(" ********** Total Rows Fetched When Quering The Individual Columns **********" + s"$individualColRowsFetched") - assert(individualColRowsFetched == 2) + assert(individualColRowsFetched == 4) logger.info("Fetching the Out Of Order Columns ") @@ -200,7 +215,7 @@ object HiveExample { } println(" ********** Total Rows Fetched When Quering The Out Of Order Columns **********" + s"$outOfOrderColFetched") - assert(outOfOrderColFetched == 2) + assert(outOfOrderColFetched == 4) hiveEmbeddedServer2.stop() } diff --git a/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala b/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala index c5db40b..268bf5f 100644 --- a/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala +++ b/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala @@ -127,6 +127,7 @@ class RunExamples extends QueryTest with BeforeAndAfterAll { } test("HiveExample") { - HiveExample.exampleBody(spark, TestQueryExecutor.warehouse) + HiveExample.createCarbonTable(TestQueryExecutor.warehouse) + HiveExample.readFromHive } } diff --git a/integration/hive/pom.xml b/integration/hive/pom.xml index 6df4f24..7056852 100644 --- a/integration/hive/pom.xml +++ b/integration/hive/pom.xml @@ -65,11 +65,6 @@ <scope>compile</scope> </dependency> <dependency> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-spark2</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-service</artifactId> <version>${hive.version}</version> @@ -108,6 +103,10 @@ <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> </dependencies> <build> diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java index a473303..7d9656e 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java @@ -91,8 +91,10 @@ public class CarbonHiveInputSplit extends FileSplit } public CarbonHiveInputSplit(String segmentId, Path path, long start, long length, - String[] locations, int numberOfBlocklets, ColumnarFormatVersion version) { + String[] locations, int numberOfBlocklets, ColumnarFormatVersion version, + BlockletDetailInfo detailInfo) { this(segmentId, path, start, length, locations, version); + this.detailInfo = detailInfo; this.numberOfBlocklets = numberOfBlocklets; } @@ -110,8 +112,8 @@ public class CarbonHiveInputSplit extends FileSplit */ public CarbonHiveInputSplit(String segmentId, Path path, long start, long length, String[] locations, int numberOfBlocklets, ColumnarFormatVersion version, - Map<String, String> blockStorageIdMap) { - this(segmentId, path, start, length, locations, numberOfBlocklets, version); + Map<String, String> blockStorageIdMap, BlockletDetailInfo detailInfo) { + this(segmentId, path, start, length, locations, numberOfBlocklets, version, detailInfo); this.blockStorageIdMap = blockStorageIdMap; } diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java index 3ca8cf1..df25e5e 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java @@ -58,7 +58,7 @@ import org.apache.hadoop.io.Writable; * It transparently passes the object to/from the Carbon file reader/writer. */ @SerDeSpec(schemaProps = { serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES }) -class CarbonHiveSerDe extends AbstractSerDe { +public class CarbonHiveSerDe extends AbstractSerDe { private final SerDeStats stats; private ObjectInspector objInspector; diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java index 1022576..64edae2 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java @@ -19,10 +19,11 @@ package org.apache.carbondata.hive; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.exception.InvalidConfigurationException; -import org.apache.carbondata.core.indexstore.BlockletDetailInfo; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.SchemaReader; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -72,7 +73,8 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl } else { if (paths != null) { for (String inputPath : inputPaths) { - if (paths.startsWith(inputPath.replace("file:", ""))) { + inputPath = inputPath.replace("file:", ""); + if (FileFactory.isFileExist(inputPath)) { validInputPath = inputPath; break; } @@ -101,8 +103,12 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl } @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { + jobConf.set(DATABASE_NAME, "_dummyDb_" + UUID.randomUUID().toString()); + jobConf.set(TABLE_NAME, "_dummyTable_" + UUID.randomUUID().toString()); org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf); - List<org.apache.hadoop.mapreduce.InputSplit> splitList = super.getSplits(jobContext); + CarbonTableInputFormat carbonTableInputFormat = new CarbonTableInputFormat(); + List<org.apache.hadoop.mapreduce.InputSplit> splitList = + carbonTableInputFormat.getSplits(jobContext); InputSplit[] splits = new InputSplit[splitList.size()]; CarbonInputSplit split; for (int i = 0; i < splitList.size(); i++) { @@ -110,13 +116,7 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl CarbonHiveInputSplit inputSplit = new CarbonHiveInputSplit(split.getSegmentId(), split.getPath(), split.getStart(), split.getLength(), split.getLocations(), split.getNumberOfBlocklets(), - split.getVersion(), split.getBlockStorageIdMap()); - BlockletDetailInfo info = new BlockletDetailInfo(); - info.setBlockSize(split.getLength()); - info.setBlockFooterOffset(split.getDetailInfo().getBlockFooterOffset()); - info.setVersionNumber(split.getVersion().number()); - info.setUseMinMaxForPruning(false); - inputSplit.setDetailInfo(info); + split.getVersion(), split.getBlockStorageIdMap(), split.getDetailInfo()); splits[i] = inputSplit; } return splits; diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java index f0071d4..427e248 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java @@ -19,20 +19,22 @@ package org.apache.carbondata.hive; import java.io.IOException; import java.util.Properties; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Progressable; /** * TODO : To extend CarbonOutputFormat */ -class MapredCarbonOutputFormat<T> extends FileOutputFormat<Void, T> +public class MapredCarbonOutputFormat<T> extends CarbonTableOutputFormat implements HiveOutputFormat<Void, T> { @Override @@ -41,6 +43,12 @@ class MapredCarbonOutputFormat<T> extends FileOutputFormat<Void, T> return null; } + @Override public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) + throws IOException { + org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf); + super.checkOutputSpecs(jobContext); + } + @Override public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java b/integration/hive/src/main/java/org/apache/carbondata/hive/test/server/HiveEmbeddedServer2.java similarity index 94% rename from integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java rename to integration/hive/src/main/java/org/apache/carbondata/hive/test/server/HiveEmbeddedServer2.java index 0b42ab9..17461b5 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/test/server/HiveEmbeddedServer2.java @@ -15,15 +15,13 @@ * limitations under the License. */ -package org.apache.carbondata.hive.server; +package org.apache.carbondata.hive.test.server; import java.io.File; import java.lang.reflect.Field; -import java.security.SecureRandom; import java.util.HashMap; import java.util.Map; import java.util.Properties; -import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -48,20 +46,22 @@ import org.apache.log4j.Logger; * a child JVM (which Hive calls local) or external. */ public class HiveEmbeddedServer2 { - private static final String SCRATCH_DIR = "/tmp/hive"; + private String SCRATCH_DIR = ""; private static final Logger log = LogServiceFactory.getLogService(Hive.class.getName()); private HiveServer2 hiveServer; private HiveConf config; private int port; - private static Random secureRandom = new SecureRandom(); - public void start() throws Exception { + public void start(String storePath) throws Exception { log.info("Starting Hive Local/Embedded Server..."); + SCRATCH_DIR = storePath; if (hiveServer == null) { config = configure(); hiveServer = new HiveServer2(); port = MetaStoreUtils.findFreePort(); config.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, port); + config.setBoolVar(ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE, true); + config.setBoolVar(ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true); hiveServer.init(config); hiveServer.start(); waitForStartup(); @@ -126,14 +126,12 @@ public class HiveEmbeddedServer2 { } } - int random = secureRandom.nextInt(); - - conf.set("hive.metastore.warehouse.dir", scratchDir + "/warehouse" + random); - conf.set("hive.metastore.metadb.dir", scratchDir + "/metastore_db" + random); + conf.set("hive.metastore.warehouse.dir", scratchDir + "/warehouse"); + conf.set("hive.metastore.metadb.dir", scratchDir + "/metastore_db"); conf.set("hive.exec.scratchdir", scratchDir); conf.set("fs.permissions.umask-mode", "022"); conf.set("javax.jdo.option.ConnectionURL", - "jdbc:derby:;databaseName=" + scratchDir + "/metastore_db" + random + ";create=true"); + "jdbc:derby:;databaseName=" + scratchDir + "/metastore_db" + ";create=true"); conf.set("hive.metastore.local", "true"); conf.set("hive.aux.jars.path", ""); conf.set("hive.added.jars.path", ""); diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml index 3fd2b03..75be897 100644 --- a/integration/spark-common-test/pom.xml +++ b/integration/spark-common-test/pom.xml @@ -109,6 +109,12 @@ <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-spark2</artifactId> <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + </exclusions> <scope>test</scope> </dependency> <dependency> diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala index 7216134..49e8e98 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.util.SparkUtil import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hive.MapredCarbonInputFormat class TestCreateHiveTableWithCarbonDS extends QueryTest with BeforeAndAfterAll { @@ -57,7 +57,7 @@ class TestCreateHiveTableWithCarbonDS extends QueryTest with BeforeAndAfterAll { if (SparkUtil.isSparkVersionEqualTo("2.2")) { assertResult(table.storage.locationUri.get)(new Path(s"file:$storeLocation/source").toUri) } - assertResult(table.storage.inputFormat.get)(classOf[CarbonTableInputFormat[_]].getName) + assertResult(table.storage.inputFormat.get)(classOf[MapredCarbonInputFormat].getName) } } diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml index 0a5f096..df683e0 100644 --- a/integration/spark-common/pom.xml +++ b/integration/spark-common/pom.xml @@ -37,6 +37,11 @@ <dependencies> <dependency> <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-hive</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-streaming</artifactId> <version>${project.version}</version> </dependency> diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala index bdacfcd..4fc30d02 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructField import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.hadoop.api.{CarbonTableInputFormat, CarbonTableOutputFormat} +import org.apache.carbondata.hive.{CarbonHiveSerDe, MapredCarbonInputFormat, MapredCarbonOutputFormat} /** * Reflection APIs @@ -357,14 +357,17 @@ object CarbonReflectionUtils { val updatedSerdeMap = serdeMap ++ Map[String, HiveSerDe]( ("org.apache.spark.sql.carbonsource", HiveSerDe(Some( - classOf[CarbonTableInputFormat[_]].getName), - Some(classOf[CarbonTableOutputFormat].getName))), + classOf[MapredCarbonInputFormat].getName), + Some(classOf[MapredCarbonOutputFormat[_]].getName), + Some(classOf[CarbonHiveSerDe].getName))), ("carbon", HiveSerDe(Some( - classOf[CarbonTableInputFormat[_]].getName), - Some(classOf[CarbonTableOutputFormat].getName))), + classOf[MapredCarbonInputFormat].getName), + Some(classOf[MapredCarbonOutputFormat[_]].getName), + Some(classOf[CarbonHiveSerDe].getName))), ("carbondata", HiveSerDe(Some( - classOf[CarbonTableInputFormat[_]].getName), - Some(classOf[CarbonTableOutputFormat].getName)))) + classOf[MapredCarbonInputFormat].getName), + Some(classOf[MapredCarbonOutputFormat[_]].getName), + Some(classOf[CarbonHiveSerDe].getName)))) instanceMirror.reflectField(field.asTerm).set(updatedSerdeMap) case _ => } diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryLRUCacheTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryLRUCacheTestCase.scala index 301644f..8cbae2e 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryLRUCacheTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryLRUCacheTestCase.scala @@ -78,6 +78,7 @@ class DictionaryLRUCacheTestCase extends Spark2QueryTest with BeforeAndAfterAll path = s"$resourcesPath/restructure/data_2000.csv" + sql("use default") sql("drop table if exists carbon_new1") sql("drop table if exists carbon_new2") sql("drop table if exists carbon_new3") diff --git a/pom.xml b/pom.xml index 6e3bde2..51c5e25 100644 --- a/pom.xml +++ b/pom.xml @@ -101,6 +101,7 @@ <module>hadoop</module> <module>integration/spark-common</module> <module>integration/spark-common-test</module> + <module>integration/hive</module> <module>datamap/examples</module> <module>store/sdk</module> <module>assembly</module>