Struchkov Lookuut Fedorovich created SPARK-33059: ----------------------------------------------------
Summary: Bug with self joined tables after posexploded column Key: SPARK-33059 URL: https://issues.apache.org/jira/browse/SPARK-33059 Project: Spark Issue Type: Bug Components: ML, MLlib Affects Versions: 3.0.1 Environment: OS ubuntu 16.07 LTS 8GB Ram Intel Core i7 Reporter: Struchkov Lookuut Fedorovich Test below should pass success, but not. DataFrames have only one difference, validDF data column type is Array[Int], and in testDF type is Array[Vector]. This bug we catch when use MinHashLSH from Spark ML {code:java} // Bug with self joined tables with posexplode columns import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.scalatest.flatspec.AnyFlatSpec import org.apache.spark.sql.functions.{udf, col, struct, posexplode} import org.apache.spark.ml.linalg.Vectors import scala.collection.mutable case class TestRow(s: String, l: Array[Long], data: Array[Int]) class SparkBug extends AnyFlatSpec{ "Test case" should "equal count" in { val conf = new SparkConf() .setMaster("local[8]") .set("spark.sql.codegen.wholeStage", "true")//its important, if set false, test will pass success val spark = SparkSession.builder().config(conf).getOrCreate() val vecUdf = udf( (s: mutable.WrappedArray[Long]) => s.map(v => Vectors.dense(v)) ) val r = new scala.util.Random(1000) import spark.implicits._ val rowCount = 200000//its important, if row count less than 200000(150000 for example), test will pass success val df = (0 to rowCount).map{ case _ => TestRow( "", (0 to 40).map(t => t.toLong).toArray, (0 to 4) .map(_ => r.nextInt(100000)).toArray ) }.toDF() .coalesce(1)//Important, without coalesce(1) test pass success val testDF = df .withColumn("data", vecUdf(col("data"))) .select( struct(col("*")).as("dd"), posexplode(col("data")) ) val validDF = df.select(struct(col("*")).as("dd"), posexplode(col("data"))) //Difference between validDF and testDF, that testDF data col have Array[Vector] type, and valid DF data col have Array[Long] val testCount = testDF .join(testDF, Seq("pos", "col")) .distinct()//Important, without distinct test pass success .count val validCount = validDF .join(validDF, Seq("pos", "col")) .distinct() .count //count should be equal, but not assert(testCount == validCount) } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org