[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16970


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r102579830
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -1996,7 +1996,7 @@ class Dataset[T] private[sql](
   def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r102572126
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -1996,7 +1996,7 @@ class Dataset[T] private[sql](
   def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
--- End diff --

you have not updates docs for dropDuplicates! You should at least point to 
withWatermark to limit the state, and mention its semantics (all data later 
than watermark will be ignored).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r102394580
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -321,3 +327,66 @@ case class MapGroupsWithStateExec(
   }
   }
 }
+
+
+/** Physical operator for executing streaming Deduplication. */
+case class DeduplicationExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateId: Option[OperatorStateId] = None,
+eventTimeWatermark: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(keyExpressions) :: Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+metrics // force lazy init at driver
+
+child.execute().mapPartitionsWithStateStore(
+  getStateId.checkpointLocation,
+  operatorId = getStateId.operatorId,
+  storeVersion = getStateId.batchId,
+  keyExpressions.toStructType,
+  child.output.toStructType,
+  sqlContext.sessionState,
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+  val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+  val numOutputRows = longMetric("numOutputRows")
+  val numTotalStateRows = longMetric("numTotalStateRows")
+  val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+
+  val baseIterator = watermarkPredicate match {
+case Some(predicate) => iter.filter((row: InternalRow) => 
!predicate.eval(row))
+case None => iter
+  }
+
+  while (baseIterator.hasNext) {
+val row = baseIterator.next().asInstanceOf[UnsafeRow]
+val key = getKey(row)
+val value = store.get(key)
+if (value.isEmpty) {
+  store.put(key.copy(), row.copy())
--- End diff --

Cool! Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r102394378
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -1996,7 +1996,7 @@ class Dataset[T] private[sql](
   def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r102391414
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -1996,7 +1996,7 @@ class Dataset[T] private[sql](
   def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
 val resolver = sparkSession.sessionState.analyzer.resolver
 val allColumns = queryExecution.analyzed.output
-val groupCols = colNames.flatMap { colName =>
+val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
--- End diff --

The results will be same. It's just pretty weird that it depends on the 
optimizer to remove duplicated columns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r102381390
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -1996,7 +1996,7 @@ class Dataset[T] private[sql](
   def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
--- End diff --

You have to add more documentation for streaming usage! especially you have 
to document that this will keep all past data as intermediate state, and you 
can use the `withWatermark` to limit how late the duplicate data can be and 
system will accordingly limit the state.

Also, double the docs on withWatermark and make sure its consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r102380326
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicationSuite.scala 
---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.functions._
+
+class DeduplicationSuite extends StateStoreMetricsTest with 
BeforeAndAfterAll {
+
+  import testImplicits._
+
+  override def afterAll(): Unit = {
+super.afterAll()
+StateStore.stop()
+  }
+
+  test("deduplication with all columns") {
+val inputData = MemoryStream[String]
+val result = inputData.toDS().dropDuplicates()
+
+testStream(result, Append)(
+  AddData(inputData, "a"),
+  CheckLastBatch("a"),
+  assertNumStateRows(total = 1, updated = 1),
+  AddData(inputData, "a"),
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+  AddData(inputData, "b"),
+  CheckLastBatch("b"),
+  assertNumStateRows(total = 2, updated = 1)
+)
+  }
+
+  test("deduplication with some columns") {
+val inputData = MemoryStream[(String, Int)]
+val result = inputData.toDS().dropDuplicates("_1")
+
+testStream(result, Append)(
+  AddData(inputData, "a" -> 1),
+  CheckLastBatch("a" -> 1),
+  assertNumStateRows(total = 1, updated = 1),
+  AddData(inputData, "a" -> 2), // Dropped
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+  AddData(inputData, "b" -> 1),
+  CheckLastBatch("b" -> 1),
+  assertNumStateRows(total = 2, updated = 1)
+)
+  }
+
+  test("multiple deduplications") {
+val inputData = MemoryStream[(String, Int)]
+val result = inputData.toDS().dropDuplicates().dropDuplicates("_1")
+
+testStream(result, Append)(
+  AddData(inputData, "a" -> 1),
+  CheckLastBatch("a" -> 1),
+  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
+
+  AddData(inputData, "a" -> 2), // Dropped from the second 
`dropDuplicates`
+  CheckLastBatch(),
+  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L)),
+
+  AddData(inputData, "b" -> 1),
+  CheckLastBatch("b" -> 1),
+  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
+)
+  }
+
+  test("deduplication with watermark") {
+val inputData = MemoryStream[Int]
+val result = inputData.toDS()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .dropDuplicates()
+  .select($"eventTime".cast("long").as[Long])
+
+testStream(result, Append)(
+  AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+  CheckLastBatch(10 to 15: _*),
+  assertNumStateRows(total = 6, updated = 6),
+
+  AddData(inputData, 25), // Advance watermark to 15 seconds
+  CheckLastBatch(25),
+  assertNumStateRows(total = 7, updated = 1),
+
+  AddData(inputData, 25), // Drop states less than watermark
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+
+  AddData(inputData, 10), // Should not emit anything as data less 
than watermark
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+
+  AddData(inputData, 45), // Advance watermark to 35 seconds
+  CheckLastBatch(45),
+  assertNumStateRows(total = 2, updated = 1),
+
+  AddData(inputData, 45), // Drop states less than watermark
+  CheckLa

[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r102379272
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -321,3 +327,66 @@ case class MapGroupsWithStateExec(
   }
   }
 }
+
+
+/** Physical operator for executing streaming Deduplication. */
+case class DeduplicationExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateId: Option[OperatorStateId] = None,
+eventTimeWatermark: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(keyExpressions) :: Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+metrics // force lazy init at driver
+
+child.execute().mapPartitionsWithStateStore(
+  getStateId.checkpointLocation,
+  operatorId = getStateId.operatorId,
+  storeVersion = getStateId.batchId,
+  keyExpressions.toStructType,
+  child.output.toStructType,
+  sqlContext.sessionState,
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+  val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+  val numOutputRows = longMetric("numOutputRows")
+  val numTotalStateRows = longMetric("numTotalStateRows")
+  val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+
+  val baseIterator = watermarkPredicate match {
+case Some(predicate) => iter.filter((row: InternalRow) => 
!predicate.eval(row))
+case None => iter
+  }
+
+  while (baseIterator.hasNext) {
+val row = baseIterator.next().asInstanceOf[UnsafeRow]
+val key = getKey(row)
+val value = store.get(key)
+if (value.isEmpty) {
+  store.put(key.copy(), row.copy())
--- End diff --

How about `UnsafeRow.createFromByteArray(1, 1)` (1 byte array) rather than 
copying it completely?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r102378877
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -1996,7 +1996,7 @@ class Dataset[T] private[sql](
   def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
 val resolver = sparkSession.sessionState.analyzer.resolver
 val allColumns = queryExecution.analyzed.output
-val groupCols = colNames.flatMap { colName =>
+val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
--- End diff --

was this a bug with batch queries as well? and what would the result be 
without this fix?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r102378594
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 ---
@@ -129,6 +156,33 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
{
 outputMode = Complete,
 expectedMsgs = Seq("(map/flatMap)GroupsWithState"))
 
+  assertSupportedInStreamingPlan(
+"mapGroupsWithState - mapGroupsWithState on batch relation inside 
streaming relation",
+MapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, 
Seq(att), batchRelation),
+outputMode = Append
+  )
+
+  // Deduplication:  Not supported after a streaming aggregation
+  assertSupportedInStreamingPlan(
+"Deduplication - Deduplication on streaming relation before 
aggregation",
+Aggregate(
+  Seq(attributeWithWatermark),
+  aggExprs("c"),
+  Deduplication(Seq(att), streamRelation, streaming = true)),
+outputMode = Append)
+
+  assertNotSupportedInStreamingPlan(
+"Deduplication - Deduplication on streaming relation after 
aggregation",
+Deduplication(Seq(att), Aggregate(Nil, aggExprs("c"), streamRelation), 
streaming = true),
+outputMode = Complete,
+expectedMsgs = Seq("dropDuplicates"))
+
+  assertSupportedInStreamingPlan(
+"Deduplication - Deduplication on batch relation inside streaming 
relation",
--- End diff --

nit: inside a streaming query. 
sounds weird otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r102378522
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 ---
@@ -129,6 +156,33 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
{
 outputMode = Complete,
 expectedMsgs = Seq("(map/flatMap)GroupsWithState"))
 
+  assertSupportedInStreamingPlan(
+"mapGroupsWithState - mapGroupsWithState on batch relation inside 
streaming relation",
+MapGroupsWithState(null, att, att, Seq(att), Seq(att), att, att, 
Seq(att), batchRelation),
+outputMode = Append
+  )
+
+  // Deduplication:  Not supported after a streaming aggregation
--- End diff --

nit: Change this comment to just `// Deduplication` to reflect the whole 
subsection 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r102378336
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1143,6 +1144,24 @@ object ReplaceDistinctWithAggregate extends 
Rule[LogicalPlan] {
 }
 
 /**
+ * Replaces logical [[Deduplication]] operator with an [[Aggregate]] 
operator.
+ */
+object ReplaceDeduplicationWithAggregate extends Rule[LogicalPlan] {
--- End diff --

`ReplaceDeduplicateWithAggregate`. see comment below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r102378293
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -869,3 +869,12 @@ case object OneRowRelation extends LeafNode {
   override def output: Seq[Attribute] = Nil
   override def computeStats(conf: CatalystConf): Statistics = 
Statistics(sizeInBytes = 1)
 }
+
+/** A logical plan for `dropDuplicates`. */
+case class Deduplication(
--- End diff --

Most names are like "verbs" - aggregate, project, intersect. I think its 
best to name this "Deduplicate".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r102290771
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -1996,7 +1996,7 @@ class Dataset[T] private[sql](
   def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
 val resolver = sparkSession.sessionState.analyzer.resolver
 val allColumns = queryExecution.analyzed.output
-val groupCols = colNames.flatMap { colName =>
+val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
--- End diff --

Fixed an issue that `groupCols` may contain duplicated columns. Without 
this fix, `org.apache.spark.sql.DatasetSuite.dropDuplicates: columns with same 
column name` will fail because the hash keys are different.

Before my change, in ``org.apache.spark.sql.DatasetSuite.dropDuplicates: 
columns with same column name` test, it has two columns both called `_2`, so 
`groupCols` will contain 4 columns. However, it will be optimized to 2 columns.

After creating a new Deduplication operator, the optimization rule doesn't 
apply because it's not an Aggregate. So it will still use 4 columns as the 
group keys.

This exposes one potential breaking change: some optimization rules may not 
work after this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101878586
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -240,6 +241,9 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   rewrittenResultExpressions,
   planLater(child))
 
+  case Deduplication(keys, child) =>
--- End diff --

same thought as below. this is not really aggregation so should be a 
different strategy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101878435
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 ---
@@ -129,6 +156,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
{
 outputMode = Complete,
 expectedMsgs = Seq("(map/flatMap)GroupsWithState"))
 
+  // Deduplication:  Not supported after a streaming aggregation
--- End diff --

actually can you add a test for both, dropdup and mapgroupsWithstate, that 
tests that these operations is allowed on a batch subplan inside a streaming 
plan. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101878213
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 ---
@@ -98,6 +104,27 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
 outputMode = Update,
 expectedMsgs = Seq("multiple streaming aggregations"))
 
+  assertSupportedInStreamingPlan(
--- End diff --

great!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101878041
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicationSuite.scala 
---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.functions._
+
+class DeduplicationSuite extends StreamTest with BeforeAndAfterAll {
+
+  import testImplicits._
+
+  override def afterAll(): Unit = {
+super.afterAll()
+StateStore.stop()
+  }
+
+  test("deduplication") {
+val inputData = MemoryStream[String]
+val result = inputData.toDS().dropDuplicates()
+
+testStream(result, Append)(
+  AddData(inputData, "a"),
+  CheckLastBatch("a"),
+  assertNumStateRows(total = 1, updated = 1),
+  AddData(inputData, "a"),
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+  AddData(inputData, "b"),
+  CheckLastBatch("b"),
+  assertNumStateRows(total = 2, updated = 1)
+)
+  }
+
+  test("deduplication with columns") {
+val inputData = MemoryStream[(String, Int)]
+val result = inputData.toDS().dropDuplicates("_1")
+
+testStream(result, Append)(
+  AddData(inputData, "a" -> 1),
+  CheckLastBatch("a" -> 1),
+  assertNumStateRows(total = 1, updated = 1),
+  AddData(inputData, "a" -> 2), // Dropped
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+  AddData(inputData, "b" -> 1),
+  CheckLastBatch("b" -> 1),
+  assertNumStateRows(total = 2, updated = 1)
+)
+  }
+
+  test("multiple deduplications") {
+val inputData = MemoryStream[(String, Int)]
+val result = inputData.toDS().dropDuplicates().dropDuplicates("_1")
+
+testStream(result, Append)(
+  AddData(inputData, "a" -> 1),
+  CheckLastBatch("a" -> 1),
+  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
+
+  AddData(inputData, "a" -> 2), // Dropped from the second 
`dropDuplicates`
+  CheckLastBatch(),
+  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L)),
+
+  AddData(inputData, "b" -> 1),
+  CheckLastBatch("b" -> 1),
+  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
+)
+  }
+
+  test("deduplication with watermark") {
+val inputData = MemoryStream[Int]
+val result = inputData.toDS()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .dropDuplicates()
+  .select($"eventTime".cast("long").as[Long])
+
+testStream(result, Append)(
+  AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+  CheckLastBatch(10 to 15: _*),
+  assertNumStateRows(total = 6, updated = 6),
+
+  AddData(inputData, 25), // Advance watermark to 15 seconds
+  CheckLastBatch(25),
+  assertNumStateRows(total = 7, updated = 1),
+
+  AddData(inputData, 25), // Drop states less than watermark
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+
+  AddData(inputData, 10), // Should not emit anything as data less 
than watermark
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+
+  AddData(inputData, 45), // Advance watermark to 35 seconds
+  CheckLastBatch(45),
+  assertNumStateRows(total = 2, updated = 1),
+
+  AddData(inputData, 45), // Drop states less than watermark
+  CheckLastBatch(),
+  assertNumSta

[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101878019
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicationSuite.scala 
---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.functions._
+
+class DeduplicationSuite extends StreamTest with BeforeAndAfterAll {
+
+  import testImplicits._
+
+  override def afterAll(): Unit = {
+super.afterAll()
+StateStore.stop()
+  }
+
+  test("deduplication") {
+val inputData = MemoryStream[String]
+val result = inputData.toDS().dropDuplicates()
+
+testStream(result, Append)(
+  AddData(inputData, "a"),
+  CheckLastBatch("a"),
+  assertNumStateRows(total = 1, updated = 1),
+  AddData(inputData, "a"),
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+  AddData(inputData, "b"),
+  CheckLastBatch("b"),
+  assertNumStateRows(total = 2, updated = 1)
+)
+  }
+
+  test("deduplication with columns") {
+val inputData = MemoryStream[(String, Int)]
+val result = inputData.toDS().dropDuplicates("_1")
+
+testStream(result, Append)(
+  AddData(inputData, "a" -> 1),
+  CheckLastBatch("a" -> 1),
+  assertNumStateRows(total = 1, updated = 1),
+  AddData(inputData, "a" -> 2), // Dropped
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+  AddData(inputData, "b" -> 1),
+  CheckLastBatch("b" -> 1),
+  assertNumStateRows(total = 2, updated = 1)
+)
+  }
+
+  test("multiple deduplications") {
+val inputData = MemoryStream[(String, Int)]
+val result = inputData.toDS().dropDuplicates().dropDuplicates("_1")
+
+testStream(result, Append)(
+  AddData(inputData, "a" -> 1),
+  CheckLastBatch("a" -> 1),
+  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
+
+  AddData(inputData, "a" -> 2), // Dropped from the second 
`dropDuplicates`
+  CheckLastBatch(),
+  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L)),
+
+  AddData(inputData, "b" -> 1),
+  CheckLastBatch("b" -> 1),
+  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
+)
+  }
+
+  test("deduplication with watermark") {
+val inputData = MemoryStream[Int]
+val result = inputData.toDS()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .dropDuplicates()
+  .select($"eventTime".cast("long").as[Long])
+
+testStream(result, Append)(
+  AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+  CheckLastBatch(10 to 15: _*),
+  assertNumStateRows(total = 6, updated = 6),
+
+  AddData(inputData, 25), // Advance watermark to 15 seconds
+  CheckLastBatch(25),
+  assertNumStateRows(total = 7, updated = 1),
+
+  AddData(inputData, 25), // Drop states less than watermark
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+
+  AddData(inputData, 10), // Should not emit anything as data less 
than watermark
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+
+  AddData(inputData, 45), // Advance watermark to 35 seconds
+  CheckLastBatch(45),
+  assertNumStateRows(total = 2, updated = 1),
+
+  AddData(inputData, 45), // Drop states less than watermark
+  CheckLastBatch(),
+  assertNumSta

[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101878004
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicationSuite.scala 
---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.functions._
+
+class DeduplicationSuite extends StreamTest with BeforeAndAfterAll {
+
+  import testImplicits._
+
+  override def afterAll(): Unit = {
+super.afterAll()
+StateStore.stop()
+  }
+
+  test("deduplication") {
+val inputData = MemoryStream[String]
+val result = inputData.toDS().dropDuplicates()
+
+testStream(result, Append)(
+  AddData(inputData, "a"),
+  CheckLastBatch("a"),
+  assertNumStateRows(total = 1, updated = 1),
+  AddData(inputData, "a"),
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+  AddData(inputData, "b"),
+  CheckLastBatch("b"),
+  assertNumStateRows(total = 2, updated = 1)
+)
+  }
+
+  test("deduplication with columns") {
+val inputData = MemoryStream[(String, Int)]
+val result = inputData.toDS().dropDuplicates("_1")
+
+testStream(result, Append)(
+  AddData(inputData, "a" -> 1),
+  CheckLastBatch("a" -> 1),
+  assertNumStateRows(total = 1, updated = 1),
+  AddData(inputData, "a" -> 2), // Dropped
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+  AddData(inputData, "b" -> 1),
+  CheckLastBatch("b" -> 1),
+  assertNumStateRows(total = 2, updated = 1)
+)
+  }
+
+  test("multiple deduplications") {
+val inputData = MemoryStream[(String, Int)]
+val result = inputData.toDS().dropDuplicates().dropDuplicates("_1")
+
+testStream(result, Append)(
+  AddData(inputData, "a" -> 1),
+  CheckLastBatch("a" -> 1),
+  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
+
+  AddData(inputData, "a" -> 2), // Dropped from the second 
`dropDuplicates`
+  CheckLastBatch(),
+  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L)),
+
+  AddData(inputData, "b" -> 1),
+  CheckLastBatch("b" -> 1),
+  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
+)
+  }
+
+  test("deduplication with watermark") {
+val inputData = MemoryStream[Int]
+val result = inputData.toDS()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .dropDuplicates()
+  .select($"eventTime".cast("long").as[Long])
+
+testStream(result, Append)(
+  AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+  CheckLastBatch(10 to 15: _*),
+  assertNumStateRows(total = 6, updated = 6),
+
+  AddData(inputData, 25), // Advance watermark to 15 seconds
+  CheckLastBatch(25),
+  assertNumStateRows(total = 7, updated = 1),
+
+  AddData(inputData, 25), // Drop states less than watermark
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+
+  AddData(inputData, 10), // Should not emit anything as data less 
than watermark
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+
+  AddData(inputData, 45), // Advance watermark to 35 seconds
+  CheckLastBatch(45),
+  assertNumStateRows(total = 2, updated = 1),
+
+  AddData(inputData, 45), // Drop states less than watermark
+  CheckLastBatch(),
+  assertNumSta

[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101877959
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicationSuite.scala 
---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.functions._
+
+class DeduplicationSuite extends StreamTest with BeforeAndAfterAll {
+
+  import testImplicits._
+
+  override def afterAll(): Unit = {
+super.afterAll()
+StateStore.stop()
+  }
+
+  test("deduplication") {
--- End diff --

nit: deduplication with all columns


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101877982
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicationSuite.scala 
---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.functions._
+
+class DeduplicationSuite extends StreamTest with BeforeAndAfterAll {
+
+  import testImplicits._
+
+  override def afterAll(): Unit = {
+super.afterAll()
+StateStore.stop()
+  }
+
+  test("deduplication") {
+val inputData = MemoryStream[String]
+val result = inputData.toDS().dropDuplicates()
+
+testStream(result, Append)(
+  AddData(inputData, "a"),
+  CheckLastBatch("a"),
+  assertNumStateRows(total = 1, updated = 1),
+  AddData(inputData, "a"),
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+  AddData(inputData, "b"),
+  CheckLastBatch("b"),
+  assertNumStateRows(total = 2, updated = 1)
+)
+  }
+
+  test("deduplication with columns") {
--- End diff --

nit: deduplication with some columns


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101877518
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -338,7 +338,7 @@ class StreamSuite extends StreamTest {
 .writeStream
 .format("memory")
 .queryName("testquery")
-.outputMode("complete")
+.outputMode("append")
--- End diff --

i see. this was allowed earlier. but not any more. I this is not consistent 
with the fact that we dont allow complete mode in map-like queries. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101877274
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicationSuite.scala 
---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.functions._
+
+class DeduplicationSuite extends StreamTest with BeforeAndAfterAll {
+
+  import testImplicits._
+
+  override def afterAll(): Unit = {
+super.afterAll()
+StateStore.stop()
+  }
+
+  test("deduplication") {
+val inputData = MemoryStream[String]
+val result = inputData.toDS().dropDuplicates()
+
+testStream(result, Append)(
+  AddData(inputData, "a"),
+  CheckLastBatch("a"),
+  assertNumStateRows(total = 1, updated = 1),
+  AddData(inputData, "a"),
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+  AddData(inputData, "b"),
+  CheckLastBatch("b"),
+  assertNumStateRows(total = 2, updated = 1)
+)
+  }
+
+  test("deduplication with columns") {
+val inputData = MemoryStream[(String, Int)]
+val result = inputData.toDS().dropDuplicates("_1")
+
+testStream(result, Append)(
+  AddData(inputData, "a" -> 1),
+  CheckLastBatch("a" -> 1),
+  assertNumStateRows(total = 1, updated = 1),
+  AddData(inputData, "a" -> 2), // Dropped
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+  AddData(inputData, "b" -> 1),
+  CheckLastBatch("b" -> 1),
+  assertNumStateRows(total = 2, updated = 1)
+)
+  }
+
+  test("multiple deduplications") {
+val inputData = MemoryStream[(String, Int)]
+val result = inputData.toDS().dropDuplicates().dropDuplicates("_1")
+
+testStream(result, Append)(
+  AddData(inputData, "a" -> 1),
+  CheckLastBatch("a" -> 1),
+  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
+
+  AddData(inputData, "a" -> 2), // Dropped from the second 
`dropDuplicates`
+  CheckLastBatch(),
+  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L)),
+
+  AddData(inputData, "b" -> 1),
+  CheckLastBatch("b" -> 1),
+  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
+)
+  }
+
+  test("deduplication with watermark") {
+val inputData = MemoryStream[Int]
+val result = inputData.toDS()
+  .withColumn("eventTime", $"value".cast("timestamp"))
+  .withWatermark("eventTime", "10 seconds")
+  .dropDuplicates()
+  .select($"eventTime".cast("long").as[Long])
+
+testStream(result, Append)(
+  AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+  CheckLastBatch(10 to 15: _*),
+  assertNumStateRows(total = 6, updated = 6),
+
+  AddData(inputData, 25), // Advance watermark to 15 seconds
+  CheckLastBatch(25),
+  assertNumStateRows(total = 7, updated = 1),
+
+  AddData(inputData, 25), // Drop states less than watermark
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+
+  AddData(inputData, 10), // Should not emit anything as data less 
than watermark
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+
+  AddData(inputData, 45), // Advance watermark to 35 seconds
+  CheckLastBatch(45),
+  assertNumStateRows(total = 2, updated = 1),
+
+  AddData(inputData, 45), // Drop states less than watermark
+  CheckLastBatch(),
+  assertNumSta

[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101872099
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -321,3 +327,66 @@ case class MapGroupsWithStateExec(
   }
   }
 }
+
+
+/** Physical operator for executing streaming Deduplication. */
+case class DeduplicationExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateId: Option[OperatorStateId] = None,
+eventTimeWatermark: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(keyExpressions) :: Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+metrics // force lazy init at driver
+
+child.execute().mapPartitionsWithStateStore(
+  getStateId.checkpointLocation,
+  operatorId = getStateId.operatorId,
+  storeVersion = getStateId.batchId,
+  keyExpressions.toStructType,
+  child.output.toStructType,
+  sqlContext.sessionState,
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+  val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+  val numOutputRows = longMetric("numOutputRows")
+  val numTotalStateRows = longMetric("numTotalStateRows")
+  val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+
+  val baseIterator = watermarkPredicate match {
+case Some(predicate) => iter.filter((row: InternalRow) => 
!predicate.eval(row))
+case None => iter
+  }
+
+  while (baseIterator.hasNext) {
+val row = baseIterator.next().asInstanceOf[UnsafeRow]
+val key = getKey(row)
+val value = store.get(key)
+if (value.isEmpty) {
+  store.put(key.copy(), row.copy())
--- End diff --

Not work :(
```
java.lang.AssertionError: index (0) should < 0
at 
org.apache.spark.sql.catalyst.expressions.UnsafeRow.assertIndexIsValid(UnsafeRow.java:133)
at 
org.apache.spark.sql.catalyst.expressions.UnsafeRow.isNullAt(UnsafeRow.java:352)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:113)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101871892
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -321,3 +327,66 @@ case class MapGroupsWithStateExec(
   }
   }
 }
+
+
+/** Physical operator for executing streaming Deduplication. */
+case class StreamingDeduplicationExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateId: Option[OperatorStateId] = None,
+eventTimeWatermark: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(keyExpressions) :: Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+metrics // force lazy init at driver
+
+child.execute().mapPartitionsWithStateStore(
+  getStateId.checkpointLocation,
+  operatorId = getStateId.operatorId,
--- End diff --

nit: why are these two specified with param names?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101871744
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -321,3 +327,66 @@ case class MapGroupsWithStateExec(
   }
   }
 }
+
+
+/** Physical operator for executing streaming Deduplication. */
+case class StreamingDeduplicationExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateId: Option[OperatorStateId] = None,
+eventTimeWatermark: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(keyExpressions) :: Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+metrics // force lazy init at driver
+
+child.execute().mapPartitionsWithStateStore(
+  getStateId.checkpointLocation,
+  operatorId = getStateId.operatorId,
+  storeVersion = getStateId.batchId,
+  keyExpressions.toStructType,
+  child.output.toStructType,
+  sqlContext.sessionState,
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+  val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+  val numOutputRows = longMetric("numOutputRows")
+  val numTotalStateRows = longMetric("numTotalStateRows")
+  val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+
--- End diff --

extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101871632
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -321,3 +327,66 @@ case class MapGroupsWithStateExec(
   }
   }
 }
+
+
+/** Physical operator for executing streaming Deduplication. */
+case class DeduplicationExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateId: Option[OperatorStateId] = None,
+eventTimeWatermark: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(keyExpressions) :: Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+metrics // force lazy init at driver
+
+child.execute().mapPartitionsWithStateStore(
+  getStateId.checkpointLocation,
+  operatorId = getStateId.operatorId,
+  storeVersion = getStateId.batchId,
+  keyExpressions.toStructType,
+  child.output.toStructType,
+  sqlContext.sessionState,
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+  val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+  val numOutputRows = longMetric("numOutputRows")
+  val numTotalStateRows = longMetric("numTotalStateRows")
+  val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+
+  val baseIterator = watermarkPredicate match {
+case Some(predicate) => iter.filter((row: InternalRow) => 
!predicate.eval(row))
+case None => iter
+  }
+
+  while (baseIterator.hasNext) {
+val row = baseIterator.next().asInstanceOf[UnsafeRow]
+val key = getKey(row)
+val value = store.get(key)
+if (value.isEmpty) {
+  store.put(key.copy(), row.copy())
--- End diff --

naah. the HDFSBasedStateStore cant handle nulls. How about using 
`UnsafeRow.createFromByteArray(0, 0)`. We can reused this immutable object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101871507
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -321,3 +327,66 @@ case class MapGroupsWithStateExec(
   }
   }
 }
+
+
+/** Physical operator for executing streaming Deduplication. */
+case class DeduplicationExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateId: Option[OperatorStateId] = None,
+eventTimeWatermark: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(keyExpressions) :: Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+metrics // force lazy init at driver
+
+child.execute().mapPartitionsWithStateStore(
+  getStateId.checkpointLocation,
+  operatorId = getStateId.operatorId,
+  storeVersion = getStateId.batchId,
+  keyExpressions.toStructType,
+  child.output.toStructType,
+  sqlContext.sessionState,
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+  val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+  val numOutputRows = longMetric("numOutputRows")
+  val numTotalStateRows = longMetric("numTotalStateRows")
+  val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+
+  val baseIterator = watermarkPredicate match {
+case Some(predicate) => iter.filter((row: InternalRow) => 
!predicate.eval(row))
+case None => iter
+  }
+
+  while (baseIterator.hasNext) {
+val row = baseIterator.next().asInstanceOf[UnsafeRow]
+val key = getKey(row)
+val value = store.get(key)
+if (value.isEmpty) {
+  store.put(key.copy(), row.copy())
--- End diff --

No. StateStore assumes `value` is not null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101871078
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -321,3 +327,66 @@ case class MapGroupsWithStateExec(
   }
   }
 }
+
+
+/** Physical operator for executing streaming Deduplication. */
+case class DeduplicationExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateId: Option[OperatorStateId] = None,
+eventTimeWatermark: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(keyExpressions) :: Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+metrics // force lazy init at driver
+
+child.execute().mapPartitionsWithStateStore(
+  getStateId.checkpointLocation,
+  operatorId = getStateId.operatorId,
+  storeVersion = getStateId.batchId,
+  keyExpressions.toStructType,
+  child.output.toStructType,
+  sqlContext.sessionState,
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+  val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+  val numOutputRows = longMetric("numOutputRows")
+  val numTotalStateRows = longMetric("numTotalStateRows")
+  val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+
+  val baseIterator = watermarkPredicate match {
+case Some(predicate) => iter.filter((row: InternalRow) => 
!predicate.eval(row))
+case None => iter
+  }
+
+  while (baseIterator.hasNext) {
+val row = baseIterator.next().asInstanceOf[UnsafeRow]
+val key = getKey(row)
+val value = store.get(key)
+if (value.isEmpty) {
+  store.put(key.copy(), row.copy())
--- End diff --

can you store a null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101870839
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -68,6 +67,37 @@ trait StateStoreWriter extends StatefulOperator {
 "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number 
of updated state rows"))
 }
 
+trait WatermarkSupport extends SparkPlan {
+
+  def keyExpressions: Seq[Attribute]
--- End diff --

docs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101870824
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -68,6 +67,37 @@ trait StateStoreWriter extends StatefulOperator {
 "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number 
of updated state rows"))
 }
 
+trait WatermarkSupport extends SparkPlan {
--- End diff --

docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101870674
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -249,6 +253,17 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
*/
   object Aggregation extends Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+  case Deduplication(keys, child) =>
--- End diff --

Shouldnt there be a new strategy? After all dropping duplicates is not 
conceptually an aggregation. its just so happens that it can be implemented as 
a aggregation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101870219
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -869,3 +869,11 @@ case object OneRowRelation extends LeafNode {
   override def output: Seq[Attribute] = Nil
   override def computeStats(conf: CatalystConf): Statistics = 
Statistics(sizeInBytes = 1)
 }
+
+/** A logical plan for `dropDuplicates`. */
+case class Deduplication(
+  keys: Seq[Attribute],
--- End diff --

indent. can be on same line i think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101867628
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -338,7 +338,7 @@ class StreamSuite extends StreamTest {
 .writeStream
 .format("memory")
 .queryName("testquery")
-.outputMode("complete")
+.outputMode("append")
--- End diff --

This is a behavior change: the user cannot use `dropDuplicates` with 
`complete` or `update` without aggregation now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101866240
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2006,15 +2006,19 @@ class Dataset[T] private[sql](
   }
   cols
 }
-val groupColExprIds = groupCols.map(_.exprId)
-val aggCols = logicalPlan.output.map { attr =>
-  if (groupColExprIds.contains(attr.exprId)) {
-attr
-  } else {
-Alias(new First(attr).toAggregateExpression(), attr.name)()
+if (isStreaming) {
+  Deduplication(groupCols, logicalPlan)
+} else {
+  val groupColExprIds = groupCols.map(_.exprId)
+  val aggCols = logicalPlan.output.map { attr =>
+if (groupColExprIds.contains(attr.exprId)) {
+  attr
+} else {
+  Alias(new First(attr).toAggregateExpression(), attr.name)()
--- End diff --

Yeah, it works.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101862301
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2006,15 +2006,19 @@ class Dataset[T] private[sql](
   }
   cols
 }
-val groupColExprIds = groupCols.map(_.exprId)
-val aggCols = logicalPlan.output.map { attr =>
-  if (groupColExprIds.contains(attr.exprId)) {
-attr
-  } else {
-Alias(new First(attr).toAggregateExpression(), attr.name)()
+if (isStreaming) {
+  Deduplication(groupCols, logicalPlan)
+} else {
+  val groupColExprIds = groupCols.map(_.exprId)
+  val aggCols = logicalPlan.output.map { attr =>
+if (groupColExprIds.contains(attr.exprId)) {
+  attr
+} else {
+  Alias(new First(attr).toAggregateExpression(), attr.name)()
--- End diff --

You could do this construction at planning time if you preserve the 
attribute ids?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101858190
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2006,15 +2006,19 @@ class Dataset[T] private[sql](
   }
   cols
 }
-val groupColExprIds = groupCols.map(_.exprId)
-val aggCols = logicalPlan.output.map { attr =>
-  if (groupColExprIds.contains(attr.exprId)) {
-attr
-  } else {
-Alias(new First(attr).toAggregateExpression(), attr.name)()
+if (isStreaming) {
+  Deduplication(groupCols, logicalPlan)
+} else {
+  val groupColExprIds = groupCols.map(_.exprId)
+  val aggCols = logicalPlan.output.map { attr =>
+if (groupColExprIds.contains(attr.exprId)) {
+  attr
+} else {
+  Alias(new First(attr).toAggregateExpression(), attr.name)()
--- End diff --

@marmbrus I tried to move this to SparkPlanner but failed because 
`Alias(new First(attr).toAggregateExpression(), attr.name)()` needs to be 
resolved before planning. Thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101834289
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -35,6 +35,9 @@ object UnsupportedOperationChecker {
   case p if p.isStreaming =>
 throwError("Queries with streaming sources must be executed with 
writeStream.start()")(p)
 
+  case p: Deduplication =>
+throwError("Batch queries should not use Deduplication")(p)
--- End diff --

Why is deduplication exclusive to streaming?  Even if we don't want to 
implement a batch operator, I'd still allow it in the logical plan and just 
translate it to normal aggregation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101825645
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -112,6 +128,12 @@ object UnsupportedOperationChecker {
   "it is on aggregated DataFrame/Dataset in Complete output 
mode. Consider using " +
   "approximate distinct aggregation (e.g. 
approx_count_distinct() instead of count()).")
 
+  throwErrorIf(
+outputMode == InternalOutputModes.Complete
+  && collectStreamingDeduplications(subPlan).nonEmpty,
+"Aggregation on dropDuplicates DataFrame/Dataset in Complete 
output mode " +
--- End diff --

why not?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101824321
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -35,6 +35,9 @@ object UnsupportedOperationChecker {
   case p if p.isStreaming =>
 throwError("Queries with streaming sources must be executed with 
writeStream.start()")(p)
 
+  case p: Deduplication =>
+throwError("Batch queries should not use Deduplication")(p)
--- End diff --

because of this, I would prefer the naming to imply that as well. Maybe 
rename `Deduplication` to `StreamingDeduplication` or something. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101825992
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -869,3 +869,11 @@ case object OneRowRelation extends LeafNode {
   override def output: Seq[Attribute] = Nil
   override def computeStats(conf: CatalystConf): Statistics = 
Statistics(sizeInBytes = 1)
 }
+
+/** Streaming dropDuplicates */
+case class Deduplication(
--- End diff --

ditto: IMHO Name should reflect that it is streaming


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101824789
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -60,6 +68,14 @@ object UnsupportedOperationChecker {
   "streaming DataFrames/Datasets")(plan)
 }
 
+// Disallow multiple streaming deduplications
--- End diff --

we should support these. Example use case:
I dedup on some higher level columns to gain exactly once semantics 
(infrastructure/application specific). Then I do data transformations, then I 
do a dedup on some more specific data, e.g. region (query specific)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101827697
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicationSuite.scala 
---
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.functions._
+
+class DeduplicationSuite extends StreamTest with BeforeAndAfterAll {
+
+  import testImplicits._
+
+  override def afterAll(): Unit = {
+super.afterAll()
+StateStore.stop()
+  }
+
+  test("deduplication - complete") {
+val inputData = MemoryStream[String]
+val result = inputData.toDS().dropDuplicates()
+
+testStream(result, Complete)(
+  AddData(inputData, "a"),
+  CheckLastBatch("a"),
+  assertNumStateRows(total = 1, updated = 1),
+  AddData(inputData, "a"),
+  CheckLastBatch("a"),
+  assertNumStateRows(total = 1, updated = 0),
+  AddData(inputData, "b"),
+  CheckLastBatch("a", "b"),
+  assertNumStateRows(total = 2, updated = 1)
+)
+  }
+
+  test("deduplication - append/update") {
+val inputData = MemoryStream[String]
+val result = inputData.toDS().dropDuplicates()
+
+testStream(result, Append)(
+  AddData(inputData, "a"),
+  CheckLastBatch("a"),
+  assertNumStateRows(total = 1, updated = 1),
+  AddData(inputData, "a"),
+  CheckLastBatch(),
+  assertNumStateRows(total = 1, updated = 0),
+  AddData(inputData, "b"),
+  CheckLastBatch("b"),
+  assertNumStateRows(total = 2, updated = 1)
+)
+  }
+
+  test("deduplication with columns - complete") {
+val inputData = MemoryStream[(String, Int)]
+val result = inputData.toDS().dropDuplicates("_1")
+
+testStream(result, Complete)(
+  AddData(inputData, "a" -> 1),
+  CheckLastBatch("a" -> 1),
+  assertNumStateRows(total = 1, updated = 1),
+  AddData(inputData, "a" -> 2), // Dropped
+  CheckLastBatch("a" -> 1),
+  assertNumStateRows(total = 1, updated = 0),
+  AddData(inputData, "b" -> 1),
+  CheckLastBatch("a" -> 1, "b" -> 1),
+  assertNumStateRows(total = 2, updated = 1)
+)
+  }
+
+  test("deduplication with columns - append/update") {
+val inputData = MemoryStream[(String, Int)]
+val result = inputData.toDS().dropDuplicates("_1")
+
+testStream(result, Append)(
--- End diff --

I know the semantics are the same for `Append` and `Update` but just so 
that no one breaks it in the future, should we wrap these tests with:
`Seq(Append, Update).foreach { mode =>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101822290
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -321,3 +327,66 @@ case class MapGroupsWithStateExec(
   }
   }
 }
+
+
+/** Physical operator for executing streaming Deduplication. */
+case class DeduplicationExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateId: Option[OperatorStateId] = None,
+eventTimeWatermark: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(keyExpressions) :: Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+metrics // force lazy init at driver
+
+child.execute().mapPartitionsWithStateStore(
+  getStateId.checkpointLocation,
+  operatorId = getStateId.operatorId,
+  storeVersion = getStateId.batchId,
+  keyExpressions.toStructType,
+  child.output.toStructType,
+  sqlContext.sessionState,
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+  val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+  val numOutputRows = longMetric("numOutputRows")
+  val numTotalStateRows = longMetric("numTotalStateRows")
+  val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
+
+  val baseIterator = watermarkPredicate match {
+case Some(predicate) => iter.filter((row: InternalRow) => 
!predicate.eval(row))
+case None => iter
+  }
+
+  while (baseIterator.hasNext) {
+val row = baseIterator.next().asInstanceOf[UnsafeRow]
+val key = getKey(row)
+val value = store.get(key)
+if (value.isEmpty) {
+  store.put(key.copy(), row.copy())
--- End diff --

I don't know how to create an empty UnsafeRow. Right now the value is not 
necessary but doubles the size of state store.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16970#discussion_r101821462
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 ---
@@ -98,6 +104,27 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
 outputMode = Update,
 expectedMsgs = Seq("multiple streaming aggregations"))
 
+  assertSupportedInStreamingPlan(
--- End diff --

Added some missing tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16970: [SPARK-19497][SS]Implement streaming deduplicatio...

2017-02-16 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/16970

[SPARK-19497][SS]Implement streaming deduplication

## What changes were proposed in this pull request?

This PR adds a special streaming deduplication operator to support 
`dropDuplicates` with `aggregation` and watermark. It reuses the 
`dropDuplicates` API but creates new logical plan `Deduplication` and new 
physical plan `DeduplicationExec`.

The following cases are supported:

- dropDuplicates() (one `dropDuplicates` with any output mode)
- withWatermark(...).dropDuplicates().groupBy(...)outputMode("append")
- dropDuplicates().groupBy(...)outputMode("update")

Not supported cases:

- dropDuplicates().dropDuplicates() (multiple `dropDuplicates`s)
- groupBy(...).dropDuplicates() (`dropDuplicates` after `aggregation`)
- dropDuplicates().groupBy(...)outputMode("complete")

## How was this patch tested?

The new unite tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark dedup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16970.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16970


commit 63a7f4c62b2da32351d008f9719d513e14562e56
Author: Shixiong Zhu 
Date:   2017-02-15T19:01:57Z

Implement deduplication




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org