This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
commit e7ad177c40f0c34e9120ddb13ac337486c282800 Author: Istvan Toth <st...@apache.org> AuthorDate: Thu Nov 23 21:06:13 2023 +0100 PHOENIX-7065 Spark3 connector tests fail with Spark 3.4.1 Fix test classpath issue use spark.hadoopRDD.ignoreEmptySplits=false for Spark3 tests --- phoenix5-spark/README.md | 18 +++++++++++++----- phoenix5-spark/pom.xml | 13 +++++++++++++ .../org/apache/phoenix/spark/DataSourceApiIT.java | 6 ++++-- .../it/java/org/apache/phoenix/spark/SparkUtil.java | 3 ++- .../phoenix/spark/AbstractPhoenixSparkIT.scala | 2 ++ phoenix5-spark3-it/pom.xml | 15 +++++++++++++-- .../phoenix/spark/AbstractPhoenixSparkIT.scala | 1 + phoenix5-spark3/README.md | 21 ++++++++++++++++----- phoenix5-spark3/pom.xml | 4 ++-- pom.xml | 6 ++++-- 10 files changed, 70 insertions(+), 19 deletions(-) diff --git a/phoenix5-spark/README.md b/phoenix5-spark/README.md index f443cb0..73d68c2 100644 --- a/phoenix5-spark/README.md +++ b/phoenix5-spark/README.md @@ -38,6 +38,7 @@ val spark = SparkSession .builder() .appName("phoenix-test") .master("local") + .config("spark.hadoopRDD.ignoreEmptySplits", "false") .getOrCreate() // Load data from TABLE1 @@ -62,7 +63,8 @@ import org.apache.spark.sql.SQLContext; public class PhoenixSparkRead { public static void main() throws Exception { - SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); + SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test") + .set("spark.hadoopRDD.ignoreEmptySplits", "false"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); @@ -109,6 +111,7 @@ val spark = SparkSession .builder() .appName("phoenix-test") .master("local") + .config("spark.hadoopRDD.ignoreEmptySplits", "false") .getOrCreate() // Load INPUT_TABLE @@ -137,7 +140,8 @@ import org.apache.spark.sql.SQLContext; public class PhoenixSparkWriteFromInputTable { public static void main() throws Exception { - SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); + SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test") + .set("spark.hadoopRDD.ignoreEmptySplits", "false"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); @@ -183,6 +187,7 @@ val spark = SparkSession .builder() .appName("phoenix-test") .master("local") + .config("spark.hadoopRDD.ignoreEmptySplits", "false") .getOrCreate() val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3)) @@ -223,7 +228,8 @@ import java.util.List; public class PhoenixSparkWriteFromRDDWithSchema { public static void main() throws Exception { - SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); + SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test") + .set("spark.hadoopRDD.ignoreEmptySplits", "false"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); SparkSession spark = sqlContext.sparkSession(); @@ -306,7 +312,8 @@ import org.apache.phoenix.spark._ val configuration = new Configuration() // Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum' -val sc = new SparkContext("local", "phoenix-test") +val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false") +val sc = new SparkContext("local", "phoenix-test", sparkConf) val sqlContext = new SQLContext(sc) // Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame @@ -324,7 +331,8 @@ import org.apache.spark.sql.SQLContext import org.apache.phoenix.spark._ import org.apache.spark.rdd.RDD -val sc = new SparkContext("local", "phoenix-test") +val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false") +val sc = new SparkContext("local", "phoenix-test", sparkConf) // Load the columns 'ID' and 'COL1' from TABLE1 as an RDD val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD( diff --git a/phoenix5-spark/pom.xml b/phoenix5-spark/pom.xml index cd33ca3..78eedfa 100644 --- a/phoenix5-spark/pom.xml +++ b/phoenix5-spark/pom.xml @@ -36,6 +36,8 @@ <properties> <top.dir>${project.basedir}/..</top.dir> <skip-scala-tests>true</skip-scala-tests> + <scala.version>${scala.version.for.spark2}</scala.version> + <scala.binary.version>${scala.binary.version.for.spark2}</scala.binary.version> </properties> <dependencies> @@ -58,6 +60,17 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> + <exclusions> + <!-- The shaded hadoop-client libraries conflict with hbase-shaded-mapreduce --> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client-runtime</artifactId> + </exclusion> + </exclusions> <scope>provided</scope> </dependency> <dependency> diff --git a/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java index c6a4465..bc2637d 100644 --- a/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java +++ b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java @@ -70,7 +70,8 @@ public class DataSourceApiIT extends ParallelStatsDisabledIT { @Test public void basicWriteAndReadBackTest() throws SQLException { - SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); + SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test") + .set("spark.hadoopRDD.ignoreEmptySplits", "false"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); String tableName = generateUniqueName(); @@ -165,7 +166,8 @@ public class DataSourceApiIT extends ParallelStatsDisabledIT { @Test @Ignore // Spark3 seems to be unable to handle mixed case colum names public void lowerCaseWriteTest() throws SQLException { - SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); + SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test") + .set("spark.hadoopRDD.ignoreEmptySplits", "false"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); String tableName = generateUniqueName(); diff --git a/phoenix5-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java index 1c36f01..f00dadb 100644 --- a/phoenix5-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java +++ b/phoenix5-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java @@ -45,7 +45,8 @@ public class SparkUtil { public static SparkSession getSparkSession() { return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS) - .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate(); + .config(UI_SHOW_CONSOLE_PROGRESS, false) + .config("spark.hadoopRDD.ignoreEmptySplits", false).getOrCreate(); } public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder, String url, Configuration config) diff --git a/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala b/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala index a10d303..1b24403 100644 --- a/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala +++ b/phoenix5-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala @@ -103,6 +103,7 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter // We pass in a TenantId to allow the DDL to create tenant-specific tables/views setupTables("tenantSetup.sql", Some(TenantId)) + //FIXME is this ever used ? val conf = new SparkConf() .setAppName("PhoenixSparkIT") .setMaster("local[2]") // 2 threads, some parallelism @@ -113,6 +114,7 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter .appName("PhoenixSparkIT") .master("local[2]") // 2 threads, some parallelism .config("spark.ui.showConsoleProgress", "false") + .config("spark.hadoopRDD.ignoreEmptySplits", "false") .getOrCreate() } diff --git a/phoenix5-spark3-it/pom.xml b/phoenix5-spark3-it/pom.xml index 3d8c25b..5ce08df 100644 --- a/phoenix5-spark3-it/pom.xml +++ b/phoenix5-spark3-it/pom.xml @@ -35,8 +35,8 @@ <properties> <top.dir>${project.basedir}/..</top.dir> <spark.version>${spark3.version}</spark.version> - <scala.version>2.12.10</scala.version> - <scala.binary.version>2.12</scala.binary.version> + <scala.version>${scala.version.for.spark3}</scala.version> + <scala.binary.version>${scala.binary.version.for.spark3}</scala.binary.version> </properties> <dependencies> @@ -47,6 +47,17 @@ <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> + <exclusions> + <!-- The shaded hadoop-client libraries conflict with the minicluster --> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client-runtime</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> diff --git a/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala b/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala index 3308b82..12e679b 100644 --- a/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala +++ b/phoenix5-spark3-it/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala @@ -108,6 +108,7 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter .appName("PhoenixSparkIT") .master("local[2]") // 2 threads, some parallelism .config("spark.ui.showConsoleProgress", "false") + .config("spark.hadoopRDD.ignoreEmptySplits", "false") .getOrCreate() } diff --git a/phoenix5-spark3/README.md b/phoenix5-spark3/README.md index 824886d..ec7684a 100644 --- a/phoenix5-spark3/README.md +++ b/phoenix5-spark3/README.md @@ -48,6 +48,7 @@ val spark = SparkSession .builder() .appName("phoenix-test") .master("local") + .config("spark.hadoopRDD.ignoreEmptySplits", "false") .getOrCreate() // Load data from TABLE1 @@ -72,7 +73,8 @@ import org.apache.spark.sql.SQLContext; public class PhoenixSparkRead { public static void main() throws Exception { - SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); + SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test") + .set("spark.hadoopRDD.ignoreEmptySplits", "false"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); @@ -119,6 +121,7 @@ val spark = SparkSession .builder() .appName("phoenix-test") .master("local") + .config("spark.hadoopRDD.ignoreEmptySplits", "false") .getOrCreate() // Load INPUT_TABLE @@ -147,7 +150,8 @@ import org.apache.spark.sql.SQLContext; public class PhoenixSparkWriteFromInputTable { public static void main() throws Exception { - SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); + SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test") + .set("spark.hadoopRDD.ignoreEmptySplits", "false"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); @@ -193,6 +197,7 @@ val spark = SparkSession .builder() .appName("phoenix-test") .master("local") + .config("spark.hadoopRDD.ignoreEmptySplits", "false") .getOrCreate() val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3)) @@ -233,7 +238,8 @@ import java.util.List; public class PhoenixSparkWriteFromRDDWithSchema { public static void main() throws Exception { - SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); + SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test") + .set("spark.hadoopRDD.ignoreEmptySplits", "false"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); SparkSession spark = sqlContext.sparkSession(); @@ -308,6 +314,7 @@ create the DataFrame or RDD directly if you need fine-grained configuration. ### Load as a DataFrame directly using a Configuration object ```scala import org.apache.hadoop.conf.Configuration +import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.phoenix.spark._ @@ -315,7 +322,8 @@ import org.apache.phoenix.spark._ val configuration = new Configuration() // Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum' -val sc = new SparkContext("local", "phoenix-test") +val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false") +val sc = new SparkContext("local", "phoenix-test", sparkConf) val sqlContext = new SQLContext(sc) // Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame @@ -328,6 +336,7 @@ df.show ### Load as an RDD, using a Zookeeper URL ```scala +import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.phoenix.spark._ @@ -358,10 +367,12 @@ CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, co ``` ```scala +import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.phoenix.spark._ -val sc = new SparkContext("local", "phoenix-test") +val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false") +val sc = new SparkContext("local", "phoenix-test", sparkConf) val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3)) sc diff --git a/phoenix5-spark3/pom.xml b/phoenix5-spark3/pom.xml index 1b583e6..f2cec4a 100644 --- a/phoenix5-spark3/pom.xml +++ b/phoenix5-spark3/pom.xml @@ -35,8 +35,8 @@ <properties> <top.dir>${project.basedir}/..</top.dir> <spark.version>${spark3.version}</spark.version> - <scala.version>2.12.10</scala.version> - <scala.binary.version>2.12</scala.binary.version> + <scala.version>${scala.version.for.spark3}</scala.version> + <scala.binary.version>${scala.binary.version.for.spark3}</scala.binary.version> <jodatime.version>2.10.5</jodatime.version> </properties> diff --git a/pom.xml b/pom.xml index b795ff9..5f796e4 100644 --- a/pom.xml +++ b/pom.xml @@ -98,9 +98,11 @@ <hive3-storage.version>2.7.0</hive3-storage.version> <hive-storage.version>${hive3-storage.version}</hive-storage.version> <spark.version>2.4.0</spark.version> + <scala.version.for.spark2>2.11.12</scala.version.for.spark2> + <scala.binary.version.for.spark2>2.11</scala.binary.version.for.spark2> <spark3.version>3.0.3</spark3.version> - <scala.version>2.11.12</scala.version> - <scala.binary.version>2.11</scala.binary.version> + <scala.version.for.spark3>2.12.18</scala.version.for.spark3> + <scala.binary.version.for.spark3>2.12</scala.binary.version.for.spark3> <log4j.version>1.2.17</log4j.version> <log4j2.version>2.18.0</log4j2.version>