[CARBONDATA-3154] Fix spark-2.1 test error Spark2.2.1 supports location, but Spark2.1.0 doesn't support location, supports options(path 'your file path'). So we should change location to options(path ... ) and create new directory before use "create table" in spark2.1.0.
This closes #2981 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3524f51d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3524f51d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3524f51d Branch: refs/heads/branch-1.5 Commit: 3524f51df087059f1fa7ccd87f2082e4d2d36c20 Parents: 442e244 Author: xubo245 <xub...@huawei.com> Authored: Sat Dec 8 00:01:43 2018 +0800 Committer: Raghunandan S <carbondatacontributi...@gmail.com> Committed: Mon Dec 17 18:50:23 2018 +0530 ---------------------------------------------------------------------- .../datasource/SparkCarbonDataSourceTest.scala | 112 +++++++++++++++---- 1 file changed, 91 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3524f51d/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala index c5d6a8c..470e0bf 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala @@ -19,13 +19,14 @@ package org.apache.spark.sql.carbondata.datasource import java.io.File import java.util -import java.util.Arrays import scala.collection.JavaConverters._ import scala.collection.mutable +import org.apache.hadoop.fs.permission.{FsAction, FsPermission} import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.carbondata.datasource.TestUtil._ +import org.apache.spark.util.SparkUtil import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.carbondata.core.datamap.DataMapStoreManager @@ -998,9 +999,19 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { i += 1 } writer.close() - spark.sql("create table complextable (stringfield string, structfield struct<bytefield: " + - "byte, floatfield: float>) " + - s"using carbon location '$path'") + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + if (!FileFactory.isFileExist(path)) { + FileFactory.createDirectoryAndSetPermission(path, + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) + } + spark.sql("create table complextable (stringfield string, structfield struct<bytefield: " + + "byte, floatfield: float>) " + + s"using carbon options(path '$path')") + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + spark.sql("create table complextable (stringfield string, structfield struct<bytefield: " + + "byte, floatfield: float>) " + + s"using carbon location '$path'") + } } catch { case ex: Exception => throw new RuntimeException(ex) case _ => None @@ -1052,13 +1063,33 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { writer.close() spark.sql("drop table if exists sorted_par") spark.sql("drop table if exists sort_table") - spark.sql(s"create table sort_table (age int, height double, name string, address string," + - s" salary long, bytefield byte) using carbon location '$path'") - FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2")) - spark.sql(s"create table sorted_par(age int, height double, name string, address " + - s"string," + - s"salary long, bytefield byte) using parquet location " + - s"'$warehouse1/../warehouse2'") + val path2 = s"$warehouse1/../warehouse2"; + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + if (!FileFactory.isFileExist(path)) { + FileFactory.createDirectoryAndSetPermission(path, + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) + } + spark.sql(s"create table sort_table (age int, height double, name string, address string," + + s" salary long, bytefield byte) using carbon options(path '$path')") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2")) + if (!FileFactory.isFileExist(path2)) { + FileFactory.createDirectoryAndSetPermission(path2, + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) + } + spark.sql(s"create table sorted_par(age int, height double, name string, address " + + s"string," + + s"salary long, bytefield byte) using parquet options(path " + + s"'$path2')") + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + spark.sql(s"create table sort_table (age int, height double, name string, address string," + + s" salary long, bytefield byte) using carbon location '$path'") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2")) + spark.sql(s"create table sorted_par(age int, height double, name string, address " + + s"string," + + s"salary long, bytefield byte) using parquet location " + + s"'$warehouse1/../warehouse2'") + } + (0 to 10).foreach { i => spark.sql(s"insert into sorted_par select '$i', ${ i.toDouble / 2 }, 'name$i', " + @@ -1098,10 +1129,21 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { i += 1 } writer.close() - spark.sql(s"create table complextable (stringfield string, bytearray " + - s"array<byte>, floatarray array<float>) using carbon " + - s"location " + - s"'$path'") + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + if (!FileFactory.isFileExist(path)) { + FileFactory.createDirectoryAndSetPermission(path, + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) + } + spark.sql(s"create table complextable (stringfield string, bytearray " + + s"array<byte>, floatarray array<float>) using carbon " + + s"options( path " + + s"'$path')") + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + spark.sql(s"create table complextable (stringfield string, bytearray " + + s"array<byte>, floatarray array<float>) using carbon " + + s"location " + + s"'$path'") + } } catch { case ex: Exception => throw new RuntimeException(ex) case _ => None @@ -1123,9 +1165,20 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { private def createParquetTable { val path = FileFactory.getUpdatedFilePath(s"$warehouse1/../warehouse2") FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$path")) - spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " + - s"string," + - s"salary long, floatField float, bytefield byte) using parquet location '$path'") + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + if (!FileFactory.isFileExist(path)) { + FileFactory.createDirectoryAndSetPermission(path, + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) + } + spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " + + s"string," + + s"salary long, floatField float, bytefield byte) using parquet options(path '$path')") + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " + + s"string," + + s"salary long, floatField float, bytefield byte) using parquet location '$path'") + } + (0 to 10).foreach { i => spark.sql(s"insert into par_table select 'true','$i', ${i.toDouble / 2}, 'name$i', " + s"'address$i', ${i*100}, $i.$i, '$i'") @@ -1237,7 +1290,15 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { val rowCount = 3 buildStructSchemaWithNestedArrayOfMapTypeAsValue(writerPath, rowCount) spark.sql("drop table if exists carbon_external") - spark.sql(s"create table carbon_external using carbon location '$writerPath'") + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + if (!FileFactory.isFileExist(writerPath)) { + FileFactory.createDirectoryAndSetPermission(writerPath, + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) + } + spark.sql(s"create table carbon_external using carbon options(path '$writerPath')") + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + spark.sql(s"create table carbon_external using carbon location '$writerPath'") + } assert(spark.sql("select * from carbon_external").count() == rowCount) spark.sql("drop table if exists carbon_external") } @@ -1268,8 +1329,17 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { i += 1 } writer.close() - spark.sql(s"create table multi_page (a string, b float, c byte) using carbon location " + - s"'$path'") + if (SparkUtil.isSparkVersionEqualTo("2.1")) { + if (!FileFactory.isFileExist(path)) { + FileFactory.createDirectoryAndSetPermission(path, + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) + } + spark.sql(s"create table multi_page (a string, b float, c byte) using carbon options(path " + + s"'$path')") + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + spark.sql(s"create table multi_page (a string, b float, c byte) using carbon location " + + s"'$path'") + } assert(spark.sql("select * from multi_page").count() == 33000) } catch { case ex: Exception => throw new RuntimeException(ex)