[ https://issues.apache.org/jira/browse/SPARK-11330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14977600#comment-14977600 ]
Cheng Hao commented on SPARK-11330: ----------------------------------- Hi, [~saif.a.ellafi], I've tried the code like below: {code} case class Spark11330(account_id: Int, product: String, vint: String, band: String, age: Int, mb: String, yyyymm: String, balance: Float, balancec: Float) test("SPARK-11330: Filter operation on StringType after groupBy PERSISTED brings no results") { withTempPath { f => val d = Seq( Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200808", 1000.0f, 2000.0f), Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200808", 2000.0f, 2000.0f), Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200809", 2000.0f, 2000.0f), Spark11330(2, "product2", "2007-01-01", "band3", 29, "mb1", "200809", 2010.0f, 3000.0f)) val data = List.tabulate[Seq[Spark11330]](10) { i => d }.flatten sqlContext.sparkContext.parallelize(data, 4) .toDF().write.format("parquet").mode(SaveMode.Overwrite).save(f.getAbsolutePath) val df = sqlContext.read.parquet(f.getAbsolutePath) val f1 = df.groupBy("vint").count().persist().filter("vint = '2007-01-01'").first val f2 = df.groupBy("vint").count().filter("vint = '2007-01-01'").first assert(f1 == f2) val res = df .groupBy("product", "band", "age", "vint", "mb", "yyyymm") .agg( count($"account_id").as("N"), sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc")).persist() val c1 = res.select("vint", "yyyymm").filter("vint='2007-01-01'").select("yyyymm").distinct.collect res.unpersist() val c2 = res.select("vint", "yyyymm").filter("vint='2007-01-01'").select("yyyymm").distinct.collect assert(c1.sameElements(c2)) } } {code} Seems everything works fine, I am not sure if I missed something, can you try to reproduce the issue based on my code? > Filter operation on StringType after groupBy PERSISTED brings no results > ------------------------------------------------------------------------ > > Key: SPARK-11330 > URL: https://issues.apache.org/jira/browse/SPARK-11330 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.5.1 > Environment: Stand alone Cluster of five servers (happens as well in > local mode). sqlContext instance of HiveContext (happens as well with > SQLContext) > No special options other than driver memory and executor memory. > Parquet partitions are 512 where there are 160 cores. Happens as well with > other partitioning > Data is nearly 2 billion rows. > Reporter: Saif Addin Ellafi > Priority: Blocker > > ONLY HAPPENS WHEN PERSIST() IS CALLED > val data = sqlContext.read.parquet("/var/data/Saif/ppnr_pqt") > data.groupBy("vintages").count.select("vintages").filter("vintages = > '2007-01-01'").first > >>> res9: org.apache.spark.sql.Row = [2007-01-01] > data.groupBy("vintages").count.persist.select("vintages").filter("vintages = > '2007-01-01'").first > >>> Exception on empty iterator stuff > This does not happen if using another type of field, eg IntType > data.groupBy("yyyymm").count.persist.select("yyyymm").filter("yyyymm = > 200805").first >>> res13: org.apache.spark.sql.Row = [200805] > NOTE: I have no idea whether I used KRYO serializer when stored this parquet. > NOTE2: If setting the persist after the filtering, it works fine. But this is > not a good enough workaround since any filter operation afterwards will break > results. > NOTE3: I have reproduced the issue with several different datasets. > NOTE4: The only real workaround is to store the groupBy dataframe in database > and reload it as a new dataframe. > Query to raw-data works fine: > data.select("vintages").filter("vintages = '2007-01-01'").first >>> res4: > org.apache.spark.sql.Row = [2007-01-01] > Originally, the issue happened with a larger aggregation operation, the > result was that data was inconsistent bringing different results every call. > Reducing the operation step by step, I got into this issue. > In any case, the original operation was: > val data = sqlContext.read.parquet("/var/Saif/data_pqt") > val res = data.groupBy("product", "band", "age", "vint", "mb", > "yyyymm").agg(count($"account_id").as("N"), > sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc"), > sum($"spend").as("spend"), sum($"payment").as("payment"), > sum($"feoc").as("feoc"), sum($"cfintbal").as("cfintbal"), count($"newacct" > === 1).as("newacct")).persist() > val z = res.select("vint", "yyyymm").filter("vint = > '2007-01-01'").select("yyyymm").distinct.collect > z.length > >>> res0: Int = 102 > res.unpersist() > val z = res.select("vint", "yyyymm").filter("vint = > '2007-01-01'").select("yyyymm").distinct.collect > z.length > >>> res1: Int = 103 -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org