This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new b383941  [SEDONA-73] Add support for Scala 2.13 build (#562)
b383941 is described below

commit b38394197dede5700d5b5f959d905ed39929681e
Author: Adam Binford <[email protected]>
AuthorDate: Fri Nov 12 18:02:28 2021 -0500

    [SEDONA-73] Add support for Scala 2.13 build (#562)
---
 .github/workflows/java.yml                         |   3 +
 .github/workflows/r.yml                            |   6 +-
 pom.xml                                            |  20 ++
 .../adapters/EnvelopeAdapter.scala                 |   2 +-
 .../adapters/PythonConverter.scala                 |   6 +-
 .../translation/GeometrySeqToPythonConverter.scala |   2 +-
 .../utils/PythonAdapterWrapper.scala               |   6 +-
 .../sedona/python/wrapper/GeometrySample.scala     |   3 +-
 .../python/wrapper/TestToPythonSerialization.scala |   2 +-
 .../apache/sedona/sql/SedonaSqlExtensions.scala    |   2 +-
 .../org/apache/sedona/sql/utils/Adapter.scala      |  10 +-
 .../sql/sedona_sql/expressions/Functions.scala     |  11 +-
 .../sedona_sql/expressions/raster/Functions.scala  | 332 +++++----------------
 .../strategy/join/SpatialIndexExec.scala           |   2 +-
 .../org/apache/sedona/sql/GeometrySample.scala     |   3 +-
 .../org/apache/sedona/sql/adapterTestScala.scala   |   6 +-
 .../sedona/sql/functions/TestStSubDivide.scala     |   3 +-
 .../org/apache/sedona/sql/rasteralgebraTest.scala  |   2 +-
 .../sedona/viz/sql/SedonaVizExtensions.scala       |   2 +-
 .../sedona/viz/sql/operator/VizPartitioner.scala   |   4 +-
 .../sql/sedona_viz/expressions/Pixelize.scala      |   2 +-
 21 files changed, 136 insertions(+), 293 deletions(-)

diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml
index 93f8705..ebc5f00 100644
--- a/.github/workflows/java.yml
+++ b/.github/workflows/java.yml
@@ -23,6 +23,9 @@ jobs:
             scala: 2.11.8
           - spark: 3.0.3
             scala: 2.11.8
+        include:
+          - spark: 3.2.0
+            scala: 2.13.5
 
     steps:
     - uses: actions/checkout@v2
diff --git a/.github/workflows/r.yml b/.github/workflows/r.yml
index b4396a0..9c554e9 100644
--- a/.github/workflows/r.yml
+++ b/.github/workflows/r.yml
@@ -16,7 +16,7 @@ jobs:
       fail-fast: true
       matrix:
         spark: [2.4.8, 3.0.3, 3.1.2, 3.2.0]
-        scala: [2.11.8, 2.12.8]
+        scala: [2.11.8, 2.12.15]
         r: [oldrel, release]
         exclude:
           - spark: 3.2.0
@@ -38,10 +38,10 @@ jobs:
             scala: 2.11.8
             r: release
           - spark: 2.4.8
-            scala: 2.12.8
+            scala: 2.12.15
             r: oldrel
           - spark: 2.4.8
-            scala: 2.12.8
+            scala: 2.12.15
             r: release
     env:
       SPARK_VERSION: ${{ matrix.spark }}
diff --git a/pom.xml b/pom.xml
index 21ba1f1..a106b76 100644
--- a/pom.xml
+++ b/pom.xml
@@ -218,6 +218,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.scala-lang.modules</groupId>
+            
<artifactId>scala-collection-compat_${scala.compat.version}</artifactId>
+            <version>2.5.0</version>
+        </dependency>
         <!-- Test -->
         <dependency>
             <groupId>junit</groupId>
@@ -548,6 +553,21 @@
             </properties>
         </profile>
         <profile>
+            <id>scala2.13</id>
+            <activation>
+                <property>
+                    <name>scala</name>
+                    <value>2.13</value>
+                </property>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <properties>
+                <scala.version>2.13.5</scala.version>
+                <scala.compat.version>2.13</scala.compat.version>
+                <scaladoc.arg>-no-java-comments</scaladoc.arg>
+            </properties>
+        </profile>
+        <profile>
             <id>scala2.12</id>
             <activation>
                 <property>
diff --git 
a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/adapters/EnvelopeAdapter.scala
 
b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/adapters/EnvelopeAdapter.scala
index b1e07d2..e9f4657 100644
--- 
a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/adapters/EnvelopeAdapter.scala
+++ 
b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/adapters/EnvelopeAdapter.scala
@@ -23,7 +23,7 @@ import net.razorvine.pickle.Unpickler
 import net.razorvine.pickle.objects.ClassDict
 import org.locationtech.jts.geom.Envelope
 
-import scala.collection.JavaConverters._
+import scala.jdk.CollectionConverters._
 
 object EnvelopeAdapter {
   def getFromPython(bytes: Array[Byte]): java.util.List[Envelope] = {
diff --git 
a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/adapters/PythonConverter.scala
 
b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/adapters/PythonConverter.scala
index 8065a55..11df6d4 100644
--- 
a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/adapters/PythonConverter.scala
+++ 
b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/adapters/PythonConverter.scala
@@ -23,7 +23,7 @@ import 
org.apache.sedona.python.wrapper.translation.{FlatPairRddConverter, Geome
 import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
 import org.locationtech.jts.geom.Geometry
 
-import scala.collection.convert.Wrappers.SeqWrapper
+import scala.jdk.CollectionConverters._
 
 object PythonConverter extends GeomSerializer {
 
@@ -39,6 +39,6 @@ object PythonConverter extends GeomSerializer {
   def translatePythonRDDToJava(pythonRDD: JavaRDD[Array[Byte]]): 
JavaRDD[Geometry] =
     PythonRDDToJavaConverter(pythonRDD, geometrySerializer).translateToJava
 
-  def translateGeometrySeqToPython(spatialData: SeqWrapper[Geometry]): 
Array[Array[Byte]] =
-    GeometrySeqToPythonConverter(spatialData, 
geometrySerializer).translateToPython
+  def translateGeometrySeqToPython(spatialData: java.util.List[Geometry]): 
Array[Array[Byte]] =
+    GeometrySeqToPythonConverter(spatialData.asScala.toSeq, 
geometrySerializer).translateToPython
 }
diff --git 
a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/GeometrySeqToPythonConverter.scala
 
b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/GeometrySeqToPythonConverter.scala
index 9fa360d..237b588 100644
--- 
a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/GeometrySeqToPythonConverter.scala
+++ 
b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/GeometrySeqToPythonConverter.scala
@@ -22,7 +22,7 @@ package org.apache.sedona.python.wrapper.translation
 import org.apache.sedona.python.wrapper.utils.implicits._
 import org.locationtech.jts.geom.Geometry
 
-case class GeometrySeqToPythonConverter(spatialData: 
scala.collection.convert.Wrappers.SeqWrapper[Geometry],
+case class GeometrySeqToPythonConverter(spatialData: Seq[Geometry],
                                         geometrySerializer: 
PythonGeometrySerializer) {
 
   def translateToPython: Array[Array[Byte]] = {
diff --git 
a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/utils/PythonAdapterWrapper.scala
 
b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/utils/PythonAdapterWrapper.scala
index f3a6b0b..62ec871 100644
--- 
a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/utils/PythonAdapterWrapper.scala
+++ 
b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/utils/PythonAdapterWrapper.scala
@@ -24,17 +24,17 @@ import org.apache.spark.api.java.JavaPairRDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.locationtech.jts.geom.Geometry
 
-import scala.collection.JavaConverters._
+import scala.jdk.CollectionConverters._
 
 object PythonAdapterWrapper {
   def toDf[T <: Geometry](spatialRDD: SpatialRDD[T], fieldNames: 
java.util.ArrayList[String], sparkSession: SparkSession): DataFrame = {
-    Adapter.toDf(spatialRDD, fieldNames.asScala, sparkSession)
+    Adapter.toDf(spatialRDD, fieldNames.asScala.toSeq, sparkSession)
   }
 
   def toDf(spatialPairRDD: JavaPairRDD[Geometry, Geometry],
            leftFieldnames: java.util.ArrayList[String],
            rightFieldNames: java.util.ArrayList[String],
            sparkSession: SparkSession): DataFrame = {
-    Adapter.toDf(spatialPairRDD, leftFieldnames.asScala, 
rightFieldNames.asScala, sparkSession)
+    Adapter.toDf(spatialPairRDD, leftFieldnames.asScala.toSeq, 
rightFieldNames.asScala.toSeq, sparkSession)
   }
 }
diff --git 
a/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/GeometrySample.scala
 
b/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/GeometrySample.scala
index 31c7bc5..ea5078e 100644
--- 
a/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/GeometrySample.scala
+++ 
b/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/GeometrySample.scala
@@ -21,8 +21,7 @@ package org.apache.sedona.python.wrapper
 
 import org.locationtech.jts.geom.Geometry
 
-import java.io.FileInputStream
-import scala.tools.nsc.interpreter.InputStream
+import java.io.{FileInputStream, InputStream}
 import scala.io.Source
 
 trait GeometrySample extends PythonTestSpec {
diff --git 
a/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/TestToPythonSerialization.scala
 
b/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/TestToPythonSerialization.scala
index 96503c8..ab61985 100644
--- 
a/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/TestToPythonSerialization.scala
+++ 
b/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/TestToPythonSerialization.scala
@@ -23,7 +23,7 @@ import 
org.apache.sedona.python.wrapper.translation.{FlatPairRddConverter, Geome
 import org.apache.spark.api.java.JavaPairRDD
 import org.scalatest.Matchers
 import org.apache.sedona.python.wrapper.utils.implicits._
-import scala.collection.JavaConverters._
+import scala.jdk.CollectionConverters._
 
 
 class TestToPythonSerialization extends SparkUtil with GeometrySample with 
Matchers {
diff --git a/sql/src/main/scala/org/apache/sedona/sql/SedonaSqlExtensions.scala 
b/sql/src/main/scala/org/apache/sedona/sql/SedonaSqlExtensions.scala
index b08aca3..e3b741a 100644
--- a/sql/src/main/scala/org/apache/sedona/sql/SedonaSqlExtensions.scala
+++ b/sql/src/main/scala/org/apache/sedona/sql/SedonaSqlExtensions.scala
@@ -26,7 +26,7 @@ class SedonaSqlExtensions extends (SparkSessionExtensions => 
Unit) {
   def apply(e: SparkSessionExtensions): Unit = {
     e.injectCheckRule(spark => {
       SedonaSQLRegistrator.registerAll(spark)
-      _ => Unit
+      _ => ()
     })
   }
 }
\ No newline at end of file
diff --git a/sql/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala 
b/sql/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
index 761153e..85a9986 100644
--- a/sql/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
+++ b/sql/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
@@ -57,8 +57,8 @@ object Adapter {
   def toSpatialRdd(dataFrame: DataFrame, geometryFieldName: String, 
fieldNames: Seq[String]): SpatialRDD[Geometry] = {
     var spatialRDD = new SpatialRDD[Geometry]
     spatialRDD.rawSpatialRDD = toRdd(dataFrame, geometryFieldName).toJavaRDD()
-    import scala.collection.JavaConversions._
-    if (fieldNames != null && fieldNames.nonEmpty) spatialRDD.fieldNames = 
fieldNames
+    import scala.jdk.CollectionConverters._
+    if (fieldNames != null && fieldNames.nonEmpty) spatialRDD.fieldNames = 
fieldNames.asJava
     else spatialRDD.fieldNames = null
     spatialRDD
   }
@@ -81,8 +81,8 @@ object Adapter {
   def toSpatialRdd(dataFrame: DataFrame, geometryColId: Int, fieldNames: 
Seq[String]): SpatialRDD[Geometry] = {
     var spatialRDD = new SpatialRDD[Geometry]
     spatialRDD.rawSpatialRDD = toRdd(dataFrame, geometryColId).toJavaRDD()
-    import scala.collection.JavaConversions._
-    if (fieldNames.nonEmpty) spatialRDD.fieldNames = fieldNames
+    import scala.jdk.CollectionConverters._
+    if (fieldNames.nonEmpty) spatialRDD.fieldNames = fieldNames.asJava
     else spatialRDD.fieldNames = null
     spatialRDD
   }
@@ -107,7 +107,7 @@ object Adapter {
   }
 
   def toDf[T <: Geometry](spatialRDD: SpatialRDD[T], sparkSession: 
SparkSession): DataFrame = {
-    import scala.collection.JavaConverters._
+    import scala.jdk.CollectionConverters._
     if (spatialRDD.fieldNames != null) return toDf(spatialRDD, 
spatialRDD.fieldNames.asScala.toList, sparkSession)
     toDf(spatialRDD, null, sparkSession);
   }
diff --git 
a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
 
b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
index bb88333..b5894b8 100644
--- 
a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
+++ 
b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
@@ -369,26 +369,25 @@ case class ST_MakeValid(inputExpressions: Seq[Expression])
     val removeHoles = inputExpressions(1).eval(input).asInstanceOf[Boolean]
 
     // in order to do flatMap on java collections(util.List[Polygon])
-    import scala.collection.JavaConversions._
+    import scala.jdk.CollectionConverters._
 
     // makeValid works only on polygon or multipolygon
     if (!geometry.getGeometryType.equalsIgnoreCase("POLYGON") && 
!geometry.getGeometryType.equalsIgnoreCase("MULTIPOLYGON")) {
       throw new IllegalArgumentException("ST_MakeValid works only on Polygons 
and MultiPolygons")
     }
 
-    val validGeometry: util.List[Polygon] = geometry match {
+    val validGeometry = geometry match {
       case g: MultiPolygon =>
         (0 until g.getNumGeometries).flatMap(i => {
           val polygon = g.getGeometryN(i).asInstanceOf[Polygon]
-          val fixedPolygons = JTS.makeValid(polygon, removeHoles)
-          fixedPolygons
+          JTS.makeValid(polygon, removeHoles).asScala.iterator
         })
       case g: Polygon =>
-        JTS.makeValid(g, removeHoles)
+        JTS.makeValid(g, removeHoles).asScala.iterator
       case _ => Nil
     }
 
-    val result = validGeometry.toArray.map(g => {
+    val result = validGeometry.map(g => {
       val serializedGeometry = 
GeometrySerializer.serialize(g.asInstanceOf[Geometry])
       InternalRow(new GenericArrayData(serializedGeometry))
     })
diff --git 
a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/Functions.scala
 
b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/Functions.scala
index 672c88c..961edaf 100644
--- 
a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/Functions.scala
+++ 
b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/Functions.scala
@@ -22,7 +22,7 @@ package org.apache.spark.sql.sedona_sql.expressions.raster
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeArrayData}
-import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
 import org.apache.spark.sql.sedona_sql.expressions.UserDataGeneratator
 import org.apache.spark.sql.types._
 
@@ -31,23 +31,14 @@ import org.apache.spark.sql.types._
 /// Calculate Normalized Difference between two bands
 case class RS_NormalizedDifference(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  // This is an expression which takes one input expressions
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band1:Array[Double] = null
-    var band2:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
-      inputExpressions(1).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
-    ) {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
+    val band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     val ndvi = normalizeddifference(band1, band2)
 
     new GenericArrayData(ndvi)
@@ -82,18 +73,13 @@ case class RS_NormalizedDifference(inputExpressions: 
Seq[Expression])
 // Calculate mean value for a particular band
 case class RS_Mean(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  // This is an expression which takes one input expressions
+  assert(inputExpressions.length == 1)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 1)
-    var band:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     val mean = calculateMean(band)
     mean
   }
@@ -116,29 +102,22 @@ case class RS_Mean(inputExpressions: Seq[Expression])
 // Calculate mode of a particular band
 case class RS_Mode(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  // This is an expression which takes one input expressions
+  assert(inputExpressions.length == 1)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 1)
-    var band:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    var band = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     val mode = calculateMode(band)
     new GenericArrayData(mode)
   }
 
   private def calculateMode(band:Array[Double]):Array[Double] = {
-
     val grouped = band.toList.groupBy(x => x).mapValues(_.size)
     val modeValue = grouped.maxBy(_._2)._2
     val modes = grouped.filter(_._2 == modeValue).map(_._1)
     modes.toArray
-
   }
   override def dataType: DataType = ArrayType(DoubleType)
 
@@ -152,20 +131,14 @@ case class RS_Mode(inputExpressions: Seq[Expression])
 // fetch a particular region from a raster image given particular 
indexes(Array[minx...maxX][minY...maxY])
 case class RS_FetchRegion(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 3)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 3)
-    var band:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
-    val coordinates =  
inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toIntArray()
-    val dim = 
inputExpressions(2).eval(inputRow).asInstanceOf[GenericArrayData].toIntArray()
+    val band = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
+    val coordinates =  
inputExpressions(1).eval(inputRow).asInstanceOf[ArrayData].toIntArray()
+    val dim = 
inputExpressions(2).eval(inputRow).asInstanceOf[ArrayData].toIntArray()
     new GenericArrayData(regionEnclosed(band, coordinates,dim))
 
   }
@@ -197,18 +170,12 @@ case class RS_FetchRegion(inputExpressions: 
Seq[Expression])
 // Mark all the band values with 1 which are greater than a particular 
threshold
 case class RS_GreaterThan(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     val target = 
inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
     new GenericArrayData(findGreaterThan(band, target))
 
@@ -240,18 +207,12 @@ case class RS_GreaterThan(inputExpressions: 
Seq[Expression])
 // Mark all the band values with 1 which are greater than or equal to a 
particular threshold
 case class RS_GreaterThanEqual(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     val target = 
inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
     new GenericArrayData(findGreaterThanEqual(band, target))
 
@@ -283,18 +244,12 @@ case class RS_GreaterThanEqual(inputExpressions: 
Seq[Expression])
 // Mark all the band values with 1 which are less than a particular threshold
 case class RS_LessThan(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     val target = 
inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
     new GenericArrayData(findLessThan(band, target))
 
@@ -326,18 +281,12 @@ case class RS_LessThan(inputExpressions: Seq[Expression])
 // Mark all the band values with 1 which are less than or equal to a 
particular threshold
 case class RS_LessThanEqual(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     val target = 
inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
     new GenericArrayData(findLessThanEqual(band, target))
 
@@ -369,22 +318,14 @@ case class RS_LessThanEqual(inputExpressions: 
Seq[Expression])
 // Count number of occurences of a particular value in a band
 case class RS_Count(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
-  override def nullable: Boolean = false
+  assert(inputExpressions.length == 2)
 
-  override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+  override def nullable: Boolean = false
 
+  override def eval(inputRow: InternalRow): Any = {    
+    val band = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     val target = 
inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
     findCount(band, target)
-
   }
 
   private def findCount(band: Array[Double], target: Double):Int = {
@@ -411,18 +352,12 @@ case class RS_Count(inputExpressions: Seq[Expression])
 // Multiply a factor to all values of a band
 case class RS_MultiplyFactor(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     val target = inputExpressions(1).eval(inputRow).asInstanceOf[Int]
     new GenericArrayData(multiply(band, target))
 
@@ -452,28 +387,16 @@ case class RS_MultiplyFactor(inputExpressions: 
Seq[Expression])
 // Add two bands
 case class RS_AddBands(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band1:Array[Double] = null
-    var band2:Array[Double] = null
-
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
-      inputExpressions(1).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
-    ) {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
+    val band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     assert(band1.length == band2.length)
 
     new GenericArrayData(addBands(band1, band2))
-
   }
 
   private def addBands(band1: Array[Double], band2: 
Array[Double]):Array[Double] = {
@@ -498,27 +421,16 @@ case class RS_AddBands(inputExpressions: Seq[Expression])
 // Subtract two bands
 case class RS_SubtractBands(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band1:Array[Double] = null
-    var band2:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
-      inputExpressions(1).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
-    ) {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
+    val band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     assert(band1.length == band2.length)
 
     new GenericArrayData(subtractBands(band1, band2))
-
   }
 
   private def subtractBands(band1: Array[Double], band2: 
Array[Double]):Array[Double] = {
@@ -543,27 +455,16 @@ case class RS_SubtractBands(inputExpressions: 
Seq[Expression])
 // Multiple two bands
 case class RS_MultiplyBands(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band1:Array[Double] = null
-    var band2:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
-      inputExpressions(1).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
-    ) {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
+    val band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     assert(band1.length == band2.length)
 
     new GenericArrayData(multiplyBands(band1, band2))
-
   }
 
   private def multiplyBands(band1: Array[Double], band2: 
Array[Double]):Array[Double] = {
@@ -588,27 +489,16 @@ case class RS_MultiplyBands(inputExpressions: 
Seq[Expression])
 // Divide two bands
 case class RS_DivideBands(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band1:Array[Double] = null
-    var band2:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
-      inputExpressions(1).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
-    ) {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
+    val band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     assert(band1.length == band2.length)
 
     new GenericArrayData(divideBands(band1, band2))
-
   }
 
   private def divideBands(band1: Array[Double], band2: 
Array[Double]):Array[Double] = {
@@ -633,23 +523,15 @@ case class RS_DivideBands(inputExpressions: 
Seq[Expression])
 // Modulo of a band
 case class RS_Modulo(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     val dividend = 
inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
 
-
     new GenericArrayData(modulo(band, dividend))
-
   }
 
   private def modulo(band: Array[Double], dividend:Double):Array[Double] = {
@@ -674,18 +556,12 @@ case class RS_Modulo(inputExpressions: Seq[Expression])
 // Square root of values in a band
 case class RS_SquareRoot(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 1)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 1)
-    var band:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     new GenericArrayData(squareRoot(band))
 
   }
@@ -712,28 +588,16 @@ case class RS_SquareRoot(inputExpressions: 
Seq[Expression])
 // Bitwise AND between two bands
 case class RS_BitwiseAnd(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-
-    var band1:Array[Double] = null
-    var band2:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
-      inputExpressions(1).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
-    ) {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
+    val band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     assert(band1.length == band2.length)
 
     new GenericArrayData(bitwiseAnd(band1, band2))
-
   }
 
   private def bitwiseAnd(band1: Array[Double], band2: 
Array[Double]):Array[Double] = {
@@ -758,27 +622,16 @@ case class RS_BitwiseAnd(inputExpressions: 
Seq[Expression])
 // Bitwise OR between two bands
 case class RS_BitwiseOr(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band1:Array[Double] = null
-    var band2:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
-      inputExpressions(1).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
-    ) {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
+    val band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     assert(band1.length == band2.length)
 
     new GenericArrayData(bitwiseOr(band1, band2))
-
   }
 
   private def bitwiseOr(band1: Array[Double], band2: 
Array[Double]):Array[Double] = {
@@ -803,27 +656,16 @@ case class RS_BitwiseOr(inputExpressions: Seq[Expression])
 // if a value in band1 and band2 are different,value from band1 ins returned 
else return 0
 case class RS_LogicalDifference(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band1:Array[Double] = null
-    var band2:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
-      inputExpressions(1).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
-    ) {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
+    val band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     assert(band1.length == band2.length)
 
     new GenericArrayData(logicalDifference(band1, band2))
-
   }
 
   private def logicalDifference(band1: Array[Double], band2: 
Array[Double]):Array[Double] = {
@@ -855,27 +697,16 @@ case class RS_LogicalDifference(inputExpressions: 
Seq[Expression])
 // If a value in band 1 is not equal to 0, band1 is returned else value from 
band2 is returned
 case class RS_LogicalOver(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 2)
+
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 2)
-    var band1:Array[Double] = null
-    var band2:Array[Double] = null
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
-      inputExpressions(1).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
-    ) {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-      band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-    }
+    val band1 = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
+    val band2 = 
inputExpressions(1).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     assert(band1.length == band2.length)
 
     new GenericArrayData(logicalOver(band1, band2))
-
   }
 
   private def logicalOver(band1: Array[Double], band2: 
Array[Double]):Array[Double] = {
@@ -906,19 +737,12 @@ case class RS_LogicalOver(inputExpressions: 
Seq[Expression])
 
 case class RS_Normalize(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
+  assert(inputExpressions.length == 1)
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
     // This is an expression which takes one input expressions
-    var band:Array[Double] = null
-
-    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
-      band 
=inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
-    }
-    else {
-      band 
=inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
-
-    }
+    val band = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
     val result = normalize(band)
     new GenericArrayData(result)
   }
diff --git 
a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/SpatialIndexExec.scala
 
b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/SpatialIndexExec.scala
index d015551..43eb719 100644
--- 
a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/SpatialIndexExec.scala
+++ 
b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/SpatialIndexExec.scala
@@ -18,7 +18,7 @@
  */
 package org.apache.spark.sql.sedona_sql.strategy.join
 
-import scala.collection.JavaConverters._
+import scala.jdk.CollectionConverters._
 
 import org.apache.sedona.core.enums.IndexType
 import org.apache.spark.broadcast.Broadcast
diff --git a/sql/src/test/scala/org/apache/sedona/sql/GeometrySample.scala 
b/sql/src/test/scala/org/apache/sedona/sql/GeometrySample.scala
index 80d3682..0e5edba 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/GeometrySample.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/GeometrySample.scala
@@ -23,8 +23,7 @@ import org.apache.spark.sql.{Dataset, Row}
 import org.locationtech.jts.geom.Geometry
 import org.locationtech.jts.io.WKTReader
 
-import java.io.FileInputStream
-import scala.tools.nsc.interpreter.InputStream
+import java.io.{FileInputStream, InputStream}
 
 
 trait GeometrySample {
diff --git a/sql/src/test/scala/org/apache/sedona/sql/adapterTestScala.scala 
b/sql/src/test/scala/org/apache/sedona/sql/adapterTestScala.scala
index 3261715..ec0875a 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/adapterTestScala.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/adapterTestScala.scala
@@ -136,8 +136,8 @@ class adapterTestScala extends TestBaseScala with 
GivenWhenThen{
       assert(joinResultDf.schema(1).dataType == GeometryUDT)
       assert(joinResultDf.schema(0).name == "leftgeometry")
       assert(joinResultDf.schema(1).name == "rightgeometry")
-      import scala.collection.JavaConversions._
-      val joinResultDf2 = Adapter.toDf(joinResultPairRDD, 
polygonRDD.fieldNames, List(), sparkSession)
+      import scala.jdk.CollectionConverters._
+      val joinResultDf2 = Adapter.toDf(joinResultPairRDD, 
polygonRDD.fieldNames.asScala.toSeq, List(), sparkSession)
       assert(joinResultDf2.schema(0).dataType == GeometryUDT)
       assert(joinResultDf2.schema(0).name == "leftgeometry")
       assert(joinResultDf2.schema(1).name == "abc")
@@ -194,7 +194,7 @@ class adapterTestScala extends TestBaseScala with 
GivenWhenThen{
       val HDFDataVariableList:Array[String] = Array("LST", "QC", "Error_LST", 
"Emis_31", "Emis_32")
       val earthdataHDFPoint = new EarthdataHDFPointMapper(HDFincrement, 
HDFoffset, HDFrootGroupName, HDFDataVariableList, HDFDataVariableName, 
urlPrefix)
       val spatialRDD = new PointRDD(sparkSession.sparkContext, InputLocation, 
numPartitions, earthdataHDFPoint, StorageLevel.MEMORY_ONLY)
-      import scala.collection.JavaConverters._
+      import scala.jdk.CollectionConverters._
       spatialRDD.fieldNames = HDFDataVariableList.dropRight(4).toList.asJava
       val spatialDf = Adapter.toDf(spatialRDD, sparkSession)
       assert(spatialDf.schema.fields(1).name == "LST")
diff --git 
a/sql/src/test/scala/org/apache/sedona/sql/functions/TestStSubDivide.scala 
b/sql/src/test/scala/org/apache/sedona/sql/functions/TestStSubDivide.scala
index 5e40d74..abf3ea9 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/functions/TestStSubDivide.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/functions/TestStSubDivide.scala
@@ -25,9 +25,8 @@ import org.scalatest.funsuite.AnyFunSuite
 import org.scalatest.matchers.should.Matchers
 import org.scalatest.prop.{TableDrivenPropertyChecks, TableFor3, TableFor4}
 
-import java.io.FileInputStream
+import java.io.{FileInputStream, InputStream}
 import scala.io.Source
-import scala.tools.nsc.interpreter.InputStream
 
 
 class TestStSubDivide extends AnyFunSuite with Matchers with 
TableDrivenPropertyChecks with FunctionsHelper {
diff --git a/sql/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala 
b/sql/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala
index 23fea93..a6cf944 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala
@@ -75,7 +75,7 @@ class rasteralgebraTest extends TestBaseScala with 
BeforeAndAfter with GivenWhen
     it("Passed RS_Mode") {
       val inputDf = Seq((Seq(200.0, 400.0, 600.0, 200.0)), (Seq(200.0, 400.0, 
600.0, 700.0))).toDF("Band")
       val expectedResult = List(List(200.0), List(200.0, 400.0, 600.0, 700.0))
-      val actualResult = inputDf.selectExpr("RS_Mode(Band) as 
mode").as[List[Double]].collect().toList
+      val actualResult = inputDf.selectExpr("sort_array(RS_Mode(Band)) as 
mode").as[List[Double]].collect().toList
       val resultList = actualResult zip expectedResult
       for((actual, expected) <- resultList) {
         assert(actual == expected)
diff --git 
a/viz/src/main/scala/org/apache/sedona/viz/sql/SedonaVizExtensions.scala 
b/viz/src/main/scala/org/apache/sedona/viz/sql/SedonaVizExtensions.scala
index 671bf3c..48958b4 100644
--- a/viz/src/main/scala/org/apache/sedona/viz/sql/SedonaVizExtensions.scala
+++ b/viz/src/main/scala/org/apache/sedona/viz/sql/SedonaVizExtensions.scala
@@ -26,7 +26,7 @@ class SedonaVizExtensions extends (SparkSessionExtensions => 
Unit) {
   def apply(e: SparkSessionExtensions): Unit = {
     e.injectCheckRule(spark => {
       SedonaVizRegistrator.registerAll(spark)
-      _ => Unit
+      _ => ()
     })
   }
 }
diff --git 
a/viz/src/main/scala/org/apache/sedona/viz/sql/operator/VizPartitioner.scala 
b/viz/src/main/scala/org/apache/sedona/viz/sql/operator/VizPartitioner.scala
index 92bc969..8f2a804 100644
--- a/viz/src/main/scala/org/apache/sedona/viz/sql/operator/VizPartitioner.scala
+++ b/viz/src/main/scala/org/apache/sedona/viz/sql/operator/VizPartitioner.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.{DataFrame, Row}
 import org.locationtech.jts.geom.{Envelope, Geometry}
 
-import scala.collection.JavaConverters._
+import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
 
 object VizPartitioner {
@@ -75,7 +75,7 @@ object VizPartitioner {
           currentValues(secondIdPos) = LineageDecoder(secondaryZone.lineage)
           // Assign primary id
           currentValues(primaryIdPos) = 
LineageDecoder(secondaryZone.lineage.take(zoomLevel))
-          list += Row.fromSeq(currentValues)
+          list += Row.fromSeq(currentValues.toSeq)
         })
       }
       list.iterator
diff --git 
a/viz/src/main/scala/org/apache/spark/sql/sedona_viz/expressions/Pixelize.scala 
b/viz/src/main/scala/org/apache/spark/sql/sedona_viz/expressions/Pixelize.scala
index bcd87b5..dafad27 100644
--- 
a/viz/src/main/scala/org/apache/spark/sql/sedona_viz/expressions/Pixelize.scala
+++ 
b/viz/src/main/scala/org/apache/spark/sql/sedona_viz/expressions/Pixelize.scala
@@ -77,7 +77,7 @@ case class ST_Pixelize(inputExpressions: Seq[Expression])
       }
     }
     assert(pixels.size() > 0)
-    import scala.collection.JavaConverters._
+    import scala.jdk.CollectionConverters._
     return new GenericArrayData(pixels.asScala.map(f=> {
       val out = new ByteArrayOutputStream()
       val kryo = new Kryo()

Reply via email to