cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r882695556


##########
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##########
@@ -203,6 +204,355 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
     checkAnswer(df5, Seq(Row(10000.00, 1000.0, "amy")))
   }
 
+  private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit 
= {
+    val offsets = df.queryExecution.optimizedPlan.collect {
+      case offset: Offset => offset
+    }
+    if (removed) {
+      assert(offsets.isEmpty)
+    } else {
+      assert(offsets.nonEmpty)
+    }
+  }
+
+  test("simple scan with OFFSET") {
+    val df1 = spark.read
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .offset(1)
+    checkOffsetRemoved(df1)
+    checkPushedInfo(df1,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,")
+    checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+    val df2 = spark.read
+      .option("pushDownOffset", "false")
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .offset(1)
+    checkOffsetRemoved(df2, false)
+    checkPushedInfo(df2,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+    checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+    val df3 = spark.read
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .sort($"salary")
+      .offset(1)
+    checkOffsetRemoved(df3, false)
+    checkPushedInfo(df3,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+    checkAnswer(df3, Seq(Row(1, "amy", 10000.00, 1000.0, true)))
+
+    val df4 = spark.read
+      .option("partitionColumn", "dept")
+      .option("lowerBound", "0")
+      .option("upperBound", "2")
+      .option("numPartitions", "2")
+      .table("h2.test.employee")
+      .filter($"dept" > 1)
+      .offset(1)
+    checkOffsetRemoved(df4, false)
+    checkPushedInfo(df4, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], 
ReadSchema:")
+    checkAnswer(df4, Seq(Row(2, "david", 10000, 1300, true), Row(6, "jen", 
12000, 1200, true)))
+
+    val df5 = spark.read
+      .table("h2.test.employee")
+      .groupBy("DEPT").sum("SALARY")
+      .offset(1)
+    checkOffsetRemoved(df5, false)
+    checkPushedInfo(df5,
+      "PushedAggregates: [SUM(SALARY)], PushedFilters: [], 
PushedGroupByExpressions: [DEPT], ")
+    checkAnswer(df5, Seq(Row(2, 22000.00), Row(6, 12000.00)))
+
+    val name = udf { (x: String) => x.matches("cat|dav|amy") }
+    val sub = udf { (x: String) => x.substring(0, 3) }
+    val df6 = spark.read
+      .table("h2.test.employee")
+      .select($"SALARY", $"BONUS", sub($"NAME").as("shortName"))
+      .filter(name($"shortName"))
+      .offset(1)
+    checkOffsetRemoved(df6, false)
+    // OFFSET is pushed down only if all the filters are pushed down
+    checkPushedInfo(df6, "PushedFilters: [], ")
+    checkAnswer(df6, Seq(Row(10000.00, 1300.0, "dav"), Row(9000.00, 1200.0, 
"cat")))
+  }
+
+  test("simple scan with LIMIT and OFFSET") {
+    val df1 = spark.read
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .limit(2)
+      .offset(1)
+    checkLimitRemoved(df1)
+    checkOffsetRemoved(df1)
+    checkPushedInfo(df1,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, 
PushedOffset: OFFSET 1,")
+    checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+    val df2 = spark.read
+      .option("pushDownLimit", "false")
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .limit(2)
+      .offset(1)
+    checkLimitRemoved(df2, false)
+    checkOffsetRemoved(df2, false)
+    checkPushedInfo(df2,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+    checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+    val df3 = spark.read
+      .option("pushDownOffset", "false")
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .limit(2)
+      .offset(1)
+    checkLimitRemoved(df3)
+    checkOffsetRemoved(df3, false)
+    checkPushedInfo(df3,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, 
ReadSchema:")
+    checkAnswer(df3, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+    val df4 = spark.read
+      .option("pushDownLimit", "false")
+      .option("pushDownOffset", "false")
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .limit(2)
+      .offset(1)
+    checkLimitRemoved(df4, false)
+    checkOffsetRemoved(df4, false)
+    checkPushedInfo(df4,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+    checkAnswer(df4, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+    val df5 = spark.read
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .sort($"salary")
+      .limit(2)
+      .offset(1)
+    checkLimitRemoved(df5)
+    checkOffsetRemoved(df5)
+    checkPushedInfo(df5, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], " +
+      "PushedOffset: OFFSET 1, PushedTopN: ORDER BY [SALARY ASC NULLS FIRST] 
LIMIT 2, ReadSchema:")
+    checkAnswer(df5, Seq(Row(1, "amy", 10000.00, 1000.0, true)))
+
+    val df6 = spark.read
+      .option("pushDownLimit", "false")
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .sort($"salary")
+      .limit(2)
+      .offset(1)
+    checkLimitRemoved(df6, false)
+    checkOffsetRemoved(df6, false)
+    checkPushedInfo(df6, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], 
ReadSchema:")
+    checkAnswer(df6, Seq(Row(1, "amy", 10000.00, 1000.0, true)))
+
+    val df7 = spark.read
+      .option("pushDownOffset", "false")
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .sort($"salary")
+      .limit(2)
+      .offset(1)
+    checkLimitRemoved(df7)
+    checkOffsetRemoved(df7, false)
+    checkPushedInfo(df7, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1]," +
+      " PushedTopN: ORDER BY [SALARY ASC NULLS FIRST] LIMIT 2, ReadSchema:")
+    checkAnswer(df7, Seq(Row(1, "amy", 10000.00, 1000.0, true)))
+
+    val df8 = spark.read
+      .option("pushDownLimit", "false")
+      .option("pushDownOffset", "false")
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .sort($"salary")
+      .limit(2)
+      .offset(1)
+    checkLimitRemoved(df8, false)
+    checkOffsetRemoved(df8, false)
+    checkPushedInfo(df8, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], 
ReadSchema:")
+    checkAnswer(df8, Seq(Row(1, "amy", 10000.00, 1000.0, true)))
+
+    val df9 = spark.read
+      .option("partitionColumn", "dept")
+      .option("lowerBound", "0")
+      .option("upperBound", "2")
+      .option("numPartitions", "2")
+      .table("h2.test.employee")
+      .filter($"dept" > 1)
+      .limit(2)
+      .offset(1)
+    checkLimitRemoved(df9, false)
+    checkOffsetRemoved(df9, false)
+    checkPushedInfo(df9,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], PushedLimit: LIMIT 2, 
ReadSchema:")
+    checkAnswer(df9, Seq(Row(2, "david", 10000.00, 1300.0, true)))
+
+    val df10 = spark.read
+      .table("h2.test.employee")
+      .groupBy("DEPT").sum("SALARY")
+      .limit(2)
+      .offset(1)
+    checkLimitRemoved(df10, false)
+    checkOffsetRemoved(df10, false)
+    checkPushedInfo(df10,
+      "PushedAggregates: [SUM(SALARY)], PushedFilters: [], 
PushedGroupByExpressions: [DEPT], ")
+    checkAnswer(df10, Seq(Row(2, 22000.00)))
+
+    val name = udf { (x: String) => x.matches("cat|dav|amy") }
+    val sub = udf { (x: String) => x.substring(0, 3) }
+    val df11 = spark.read
+      .table("h2.test.employee")
+      .select($"SALARY", $"BONUS", sub($"NAME").as("shortName"))
+      .filter(name($"shortName"))
+      .limit(2)
+      .offset(1)
+    checkLimitRemoved(df11, false)
+    checkOffsetRemoved(df11, false)
+    checkPushedInfo(df11, "PushedFilters: [], ")
+    checkAnswer(df11, Seq(Row(9000.00, 1200.0, "cat")))
+  }
+
+  test("simple scan with OFFSET and LIMIT") {
+    val df1 = spark.read
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .offset(1)
+      .limit(1)
+    checkLimitRemoved(df1)
+    checkOffsetRemoved(df1)
+    checkPushedInfo(df1,
+      "[DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, PushedOffset: 
OFFSET 1,")
+    checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+    val df2 = spark.read
+      .option("pushDownOffset", "false")
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .offset(1)
+      .limit(1)
+    checkLimitRemoved(df2, false)
+    checkOffsetRemoved(df2, false)
+    checkPushedInfo(df2,
+      "[DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 2, ReadSchema:")
+    checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+    val df3 = spark.read
+      .option("pushDownLimit", "false")
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .offset(1)
+      .limit(1)
+    checkLimitRemoved(df3, false)
+    checkOffsetRemoved(df3)
+    checkPushedInfo(df3,
+      "[DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1, ReadSchema:")
+    checkAnswer(df3, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+    val df4 = spark.read
+      .option("pushDownOffset", "false")
+      .option("pushDownLimit", "false")
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .offset(1)
+      .limit(1)
+    checkLimitRemoved(df4, false)
+    checkOffsetRemoved(df4, false)
+    checkPushedInfo(df4,
+      "[DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+    checkAnswer(df4, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+    val df5 = spark.read
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .sort($"salary")
+      .offset(1)
+      .limit(1)
+    checkLimitRemoved(df5, false)
+    checkOffsetRemoved(df5)
+    checkPushedInfo(df5, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], " +
+      "PushedOffset: OFFSET 1, PushedTopN: ORDER BY [SALARY ASC NULLS FIRST] 
LIMIT 2, ReadSchema:")
+    checkAnswer(df5, Seq(Row(1, "amy", 10000.00, 1000.0, true)))
+
+    val df6 = spark.read
+      .option("pushDownOffset", "false")
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .sort($"salary")
+      .offset(1)
+      .limit(1)
+    checkLimitRemoved(df6, false)
+    checkOffsetRemoved(df6, false)
+    checkPushedInfo(df6, "[DEPT IS NOT NULL, DEPT = 1]," +
+      " PushedTopN: ORDER BY [SALARY ASC NULLS FIRST] LIMIT 2, ReadSchema:")
+    checkAnswer(df6, Seq(Row(1, "amy", 10000.00, 1000.0, true)))
+
+    val df7 = spark.read
+      .option("pushDownLimit", "false")
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .sort($"salary")
+      .offset(1)
+      .limit(1)
+    checkLimitRemoved(df7, false)
+    checkOffsetRemoved(df7, false)
+    checkPushedInfo(df7, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], 
ReadSchema:")
+    checkAnswer(df7, Seq(Row(1, "amy", 10000.00, 1000.0, true)))
+
+    val df8 = spark.read
+      .option("pushDownOffset", "false")
+      .option("pushDownLimit", "false")
+      .table("h2.test.employee")
+      .where($"dept" === 1)
+      .sort($"salary")
+      .offset(1)
+      .limit(1)
+    checkLimitRemoved(df8, false)
+    checkOffsetRemoved(df8, false)
+    checkPushedInfo(df8, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], 
ReadSchema:")
+    checkAnswer(df8, Seq(Row(1, "amy", 10000.00, 1000.0, true)))
+
+    val df9 = spark.read
+      .option("partitionColumn", "dept")
+      .option("lowerBound", "0")
+      .option("upperBound", "2")
+      .option("numPartitions", "2")
+      .table("h2.test.employee")
+      .filter($"dept" > 1)
+      .offset(1)
+      .limit(1)
+    checkLimitRemoved(df9, false)
+    checkOffsetRemoved(df9, false)
+    checkPushedInfo(df9,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], PushedLimit: LIMIT 2, 
ReadSchema:")
+    checkAnswer(df9, Seq(Row(2, "david", 10000.00, 1300.0, true)))
+
+    val df10 = sql("SELECT dept, sum(salary) FROM h2.test.employee group by 
dept LIMIT 1 OFFSET 1")

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to