EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/395617 )

Change subject: AtLeastNDistinct returns wrong value on merge
......................................................................

AtLeastNDistinct returns wrong value on merge

The merge operation wasn't correctly taking buf2 into
account. Add some tests to verify how this should work
and update merge to correctly integrate buf2 into buf1.

Change-Id: Ib37b60e4f4ae2354d1d1181460e1b511c0c13cc2
---
M jvm/src/main/scala/org/wikimedia/search/mjolnir/AtLeastNDistinct.scala
A jvm/src/test/scala/org/wikimedia/search/mjolnir/AtLeastNDistinctSuite.scala
2 files changed, 72 insertions(+), 1 deletion(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/17/395617/1

diff --git 
a/jvm/src/main/scala/org/wikimedia/search/mjolnir/AtLeastNDistinct.scala 
b/jvm/src/main/scala/org/wikimedia/search/mjolnir/AtLeastNDistinct.scala
index 1b49e25..74d7467 100644
--- a/jvm/src/main/scala/org/wikimedia/search/mjolnir/AtLeastNDistinct.scala
+++ b/jvm/src/main/scala/org/wikimedia/search/mjolnir/AtLeastNDistinct.scala
@@ -57,7 +57,9 @@
   }
 
   override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
-    if (!buffer1.getAs[Boolean](buffer_reached)) {
+    if (buffer2.getAs[Boolean](buffer_reached)) {
+      buffer1(buffer_reached) = true
+    } else if (!buffer1.getAs[Boolean](buffer_reached)) {
       getSet(buffer1) ++= getSet(buffer2)
       checkReached(buffer1)
     }
diff --git 
a/jvm/src/test/scala/org/wikimedia/search/mjolnir/AtLeastNDistinctSuite.scala 
b/jvm/src/test/scala/org/wikimedia/search/mjolnir/AtLeastNDistinctSuite.scala
new file mode 100644
index 0000000..962dbb9
--- /dev/null
+++ 
b/jvm/src/test/scala/org/wikimedia/search/mjolnir/AtLeastNDistinctSuite.scala
@@ -0,0 +1,69 @@
+package org.wikimedia.search.mjolnir
+
+import org.apache.spark.sql.expressions.MutableAggregationBuffer
+import org.scalatest.FunSuite
+
+class DummyBuffer(init: Array[Any]) extends MutableAggregationBuffer {
+  val values: Array[Any] = init
+  def update(i: Int, value: Any): Unit = values(i) = value
+  def get(i: Int) = values(i)
+  def length: Int = init.length
+  def copy() = new DummyBuffer(init.clone())
+}
+
+class AtLeastNDistinctSuite extends FunSuite {
+  import org.scalatest.prop.TableDrivenPropertyChecks._
+
+  test("basic operation") {
+    val udaf = new AtLeastNDistinct
+    val buf = new DummyBuffer(new Array(udaf.bufferSchema.length))
+    val row = new DummyBuffer(new Array(udaf.inputSchema.length))
+
+    forAll(Table(
+      ("limit", "expected", "values"),
+      (1, false, Seq()),
+      (1, true, Seq("zomg")),
+      (1, true, Seq("hi", "hi", "hi")),
+      (2, false, Seq("hi", "hi", "hi")),
+      (2, true, Seq("hi", "there", "hi"))
+    )) { (limit: Int, expect: Boolean, values: Seq[String]) =>
+      udaf.initialize(buf)
+      row(udaf.input_limit) = limit
+      values.foreach { value =>
+        row(udaf.input_value) = value
+        udaf.update(buf, row)
+      }
+      assert(udaf.evaluate(buf) == expect)
+    }
+  }
+
+  test("merge") {
+    val udaf = new AtLeastNDistinct
+    val buf1 = new DummyBuffer(new Array(udaf.bufferSchema.length))
+    val buf2 = new DummyBuffer(new Array(udaf.bufferSchema.length))
+    val row = new DummyBuffer(new Array(udaf.inputSchema.length))
+
+    forAll(Table(
+      ("limit", "expected", "a", "b"),
+      (1, true, Set("a"), Set[String]()),
+      (1, true, Set[String](), Set("a")),
+      (2, false, Set("a"), Set("a")),
+      (2, true, Set("a"), Set("b"))
+    )) { (limit: Int, expect: Boolean, a: Set[String], b: Set[String]) =>
+      udaf.initialize(buf1)
+      udaf.initialize(buf2)
+      row(udaf.input_limit) = limit
+      a.foreach { value =>
+        row(udaf.input_value) = value
+        udaf.update(buf1, row)
+      }
+      b.foreach { value =>
+        row(udaf.input_value) = value
+        udaf.update(buf2, row)
+      }
+
+      udaf.merge(buf1, buf2)
+      assert(udaf.evaluate(buf1) == expect)
+    }
+  }
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/395617
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib37b60e4f4ae2354d1d1181460e1b511c0c13cc2
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to