[GitHub] spark issue #16043: [SPARK-18601][SQL] Simplify Create/Get complex expressio...

2017-01-31 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16043
  
**[Test build #72209 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72209/testReport)**
 for PR 16043 at commit 
[`bd27a2b`](https://github.com/apache/spark/commit/bd27a2b029c9b9f4e7743446a5bfab4ab14dfca8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16740: [SPARK-19400][ML] Allow GLM to handle intercept only mod...

2017-01-31 Thread actuaryzhang
Github user actuaryzhang commented on the issue:

https://github.com/apache/spark/pull/16740
  
@sethah Yes, we can directly compute the intercept easily. But I'm 
concerned that such special handling may not integrate well with other features 
or future changes. For example, we will need to compute the standard error 
analytically as well, which is not difficult. But the point is that every time 
there is new feature, one would have to modify the intercept calculation part 
to handle it. This does not seem efficient. Thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16760: [SPARK-18872][SQL][TESTS] New test cases for EXIS...

2017-01-31 Thread dilipbiswal
GitHub user dilipbiswal opened a pull request:

https://github.com/apache/spark/pull/16760

[SPARK-18872][SQL][TESTS] New test cases for EXISTS subquery (Aggregate, 
Having, Orderby, Limit)



## What changes were proposed in this pull request?
This PR adds the second set of tests for EXISTS subquery.  

File name| Brief description
| -
exists-aggregate.sql  |Tests aggregate expressions in outer 
query and EXISTS subquery.
exists-having.sql|Tests HAVING clause in subquery.
exists-orderby-limit.sql|Tests EXISTS subquery support with ORDER BY and 
LIMIT clauses.

DB2 results are attached here as reference :


[exists-aggregate-db2.txt](https://github.com/apache/spark/files/743287/exists-aggregate-db2.txt)

[exists-having-db2.txt](https://github.com/apache/spark/files/743286/exists-having-db2.txt)

[exists-orderby-limit-db2.txt](https://github.com/apache/spark/files/743288/exists-orderby-limit-db2.txt)


(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dilipbiswal/spark exists-pr2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16760.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16760






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16760: [SPARK-18872][SQL][TESTS] New test cases for EXISTS subq...

2017-01-31 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16760
  
**[Test build #72208 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72208/testReport)**
 for PR 16760 at commit 
[`2473e0c`](https://github.com/apache/spark/commit/2473e0c440a9d1cd761ae6d704d0aa02c63afd83).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16759: [SPARK-18871][SQL][TESTS] New test cases for IN/NOT IN s...

2017-01-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16759
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16650: [SPARK-16554][CORE] Automatically Kill Executors ...

2017-01-31 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/16650#discussion_r98782219
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -187,6 +198,19 @@ private[scheduler] class BlacklistTracker (
   nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
   listenerBus.post(SparkListenerNodeBlacklisted(now, node, 
blacklistedExecsOnNode.size))
   _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
+  if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+allocationClient match {
+  case Some(allocationClient) =>
+logInfo(s"Killing all executors on blacklisted host $node 
" +
+  s"since spark.blacklist.kill is set.")
+if(allocationClient.killExecutorsOnHost(node) == false) {
--- End diff --

nit space after if before (


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14725: [SPARK-17161] [PYSPARK][ML] Add PySpark-ML JavaWrapper c...

2017-01-31 Thread holdenk
Github user holdenk commented on the issue:

https://github.com/apache/spark/pull/14725
  
LGTM - thanks for doing this - please ping me on the follow up PRs with the 
`CountVectorizerModel`. Before merge would you mind updating the PR description 
for how this was tested to remove the note about the `CountVectorizerModel` 
since it isn't included in this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16759: [SPARK-18871][SQL][TESTS] New test cases for IN/N...

2017-01-31 Thread kevinyu98
GitHub user kevinyu98 opened a pull request:

https://github.com/apache/spark/pull/16759

[SPARK-18871][SQL][TESTS] New test cases for IN/NOT IN subquery 2nd batch

## What changes were proposed in this pull request?

This is 2nd batch of test case for IN/NOT IN subquery.  In this PR, it has 
these test cases:
`in-limit.sql`
`in-order-by.sql`
`not-in-group-by.sql`

These are the queries and results from running on DB2.
[in-limit DB2 
version](https://github.com/apache/spark/files/743267/in-limit.sql.db2.out.txt)
[in-order-by DB2 
version](https://github.com/apache/spark/files/743269/in-order-by.sql.db2.txt)
[not-in-group-by DB2 
version](https://github.com/apache/spark/files/743271/not-in-group-by.sql.db2.txt)
[output of in-limit.sql 
DB2](https://github.com/apache/spark/files/743276/in-limit.sql.db2.out.txt)
[output of in-order-by.sql 
DB2](https://github.com/apache/spark/files/743278/in-order-by.sql.db2.out.txt)
[output of not-in-group-by.sql 
DB2](https://github.com/apache/spark/files/743279/not-in-group-by.sql.db2.out.txt)

## How was this patch tested?

This pr is adding new test cases.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kevinyu98/spark spark-18871-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16759.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16759






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16650: [SPARK-16554][CORE] Automatically Kill Executors ...

2017-01-31 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/16650#discussion_r98781265
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -187,6 +198,19 @@ private[scheduler] class BlacklistTracker (
   nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
   listenerBus.post(SparkListenerNodeBlacklisted(now, node, 
blacklistedExecsOnNode.size))
   _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
+  if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+allocationClient match {
+  case Some(allocationClient) =>
+logInfo(s"Killing all executors on blacklisted host $node 
" +
+  s"since spark.blacklist.kill is set.")
--- End diff --

config name wrong


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16650: [SPARK-16554][CORE] Automatically Kill Executors ...

2017-01-31 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/16650#discussion_r98781173
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -173,6 +174,16 @@ private[scheduler] class BlacklistTracker (
 listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, 
newTotal))
 executorIdToFailureList.remove(exec)
 updateNextExpiryTime()
+if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+  allocationClient match {
+case Some(allocationClient) =>
+  logInfo(s"Killing blacklisted executor id $exec since 
spark.blacklist.kill is set.")
--- End diff --

the config name is wrong here now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98778359
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -313,6 +313,25 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 }
   }
 
+  /**
+   * Strategy to convert MapGroupsWithState logical operator to physical 
operator
+   * in streaming plans. Conversion for batch plans is handled by 
[[BasicOperators]].
+   */
+  object MapGroupsWithStateStrategy extends Strategy {
+override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+  case MapGroupsWithState(
+func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr,
--- End diff --

I think we indent 4 or 2 past the class normally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98776316
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
   }
 
   /**
+   * ::Experimental::
+   * (Scala-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
+   * For each unique group, the function will be passed the group key and 
an iterator that contains
--- End diff --

I would consider breaking this up to make it a little easier to follow:

```
For each unique group, the given function will be invoked with the 
following arguments:
 - The key of the group.
 - A user-defined state object set by previous invocations of the given 
function.  Note that, for batch queries, there is only ever one invocation and 
thus the state object will always be empty.
 - An iterator containing all the values for this key.
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98774221
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
   }
 
   /**
+   * ::Experimental::
+   * (Scala-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
+   * For each unique group, the function will be passed the group key and 
an iterator that contains
+   * all of the elements in the group. The function can return an object 
of arbitrary type, and
+   * optionally update or remove the corresponding state. The returned 
object will form a new
+   * [[Dataset]].
+   *
+   * This function can be applied on both batch and streaming Datasets. 
With a streaming dataset,
+   * this function will be once for each in every trigger. For each key, 
the updated state from the
--- End diff --

will be *called*


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98779817
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -235,3 +240,86 @@ case class StateStoreSaveExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 }
+
+
+/**
+ * Physical operator for executing streaming mapGroupsWithState.
+ */
+case class MapGroupsWithStateExec(
+func: (Any, Iterator[Any], LogicalState[Any]) => Iterator[Any],
+keyDeserializer: Expression,   // probably not needed
+valueDeserializer: Expression,
+groupingAttributes: Seq[Attribute],
+dataAttributes: Seq[Attribute],
+outputObjAttr: Attribute,
+stateId: Option[OperatorStateId],
+stateDeserializer: Expression,
+stateSerializer: Seq[NamedExpression],
+child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with 
StateStoreWriter {
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))   // is this 
ordering needed?
+
+  override protected def doExecute(): RDD[InternalRow] = {
+
+child.execute().mapPartitionsWithStateStore[InternalRow](
+  getStateId.checkpointLocation,
+  operatorId = getStateId.operatorId,
+  storeVersion = getStateId.batchId,
+  groupingAttributes.toStructType,
+  child.output.toStructType,
+  sqlContext.sessionState,
+  Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
+try {
--- End diff --

existing, but should `mapPartitionsWithStateStore` be implementing the 
abort handling.  Seems generic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98778114
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalState
+
+/**
+ * :: Experimental ::
+ *
+ * Wrapper class for interacting with state data in `mapGroupsWithState` 
and
+ * `flatMapGroupsWithState` operations on
+ * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]].
+ *
+ * @note Operations on state are not threadsafe.
+ *
+ * Scala example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *def mappingFunction(key: String, value: Iterable[Int], state: 
State[Int]): Option[String] = {
+ *  // Check if state exists
+ *  if (state.exists) {
+ *val existingState = state.get  // Get the existing state
+ *val shouldRemove = ... // Decide whether to remove the 
state
+ *if (shouldRemove) {
+ *  state.remove() // Remove the state
+ *} else {
+ *  val newState = ...
+ *  state.update(newState)// Set the new state
+ *}
+ *  } else {
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *  }
+ *  ... // return something
+ *}
+ *
+ * }}}
+ *
+ * Java example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *Function3, State, String> 
mappingFunction =
+ *   new Function3, State, 
String>() {
+ *
+ * @Override
+ * public String call(String key, Optional value, 
State state) {
+ *   if (state.exists()) {
+ * int existingState = state.get(); // Get the existing state
+ * boolean shouldRemove = ...; // Decide whether to remove the 
state
+ * if (shouldRemove) {
+ *   state.remove(); // Remove the state
+ * } else {
+ *   int newState = ...;
+ *   state.update(newState); // Set the new state
+ * }
+ *   } else {
+ * int initialState = ...; // Set the initial state
+ * state.update(initialState);
+ *   }
+ *   ... // return something
+ * }
+ *   };
+ * }}}
+ *
+ * @tparam S Type of the state
+ * @since 2.1.1
+ */
+@Experimental
+@InterfaceStability.Evolving
+trait State[S] extends LogicalState[S] {
+
+  def exists: Boolean
+
+  def get(): S
+
+  def update(newState: S): Unit
+
+  def remove(): Unit
--- End diff --

Scala doc for these, even though its pretty obvious.  In particular, I 
assume its safe to call update() get remove() more than once in the function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98779663
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -235,3 +240,86 @@ case class StateStoreSaveExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 }
+
+
+/**
+ * Physical operator for executing streaming mapGroupsWithState.
+ */
+case class MapGroupsWithStateExec(
+func: (Any, Iterator[Any], LogicalState[Any]) => Iterator[Any],
+keyDeserializer: Expression,   // probably not needed
+valueDeserializer: Expression,
+groupingAttributes: Seq[Attribute],
+dataAttributes: Seq[Attribute],
+outputObjAttr: Attribute,
+stateId: Option[OperatorStateId],
+stateDeserializer: Expression,
+stateSerializer: Seq[NamedExpression],
+child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with 
StateStoreWriter {
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))   // is this 
ordering needed?
+
+  override protected def doExecute(): RDD[InternalRow] = {
+
--- End diff --

nit: extra newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98775275
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
   }
 
   /**
+   * ::Experimental::
+   * (Scala-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
+   * For each unique group, the function will be passed the group key and 
an iterator that contains
+   * all of the elements in the group. The function can return an object 
of arbitrary type, and
+   * optionally update or remove the corresponding state. The returned 
object will form a new
+   * [[Dataset]].
+   *
+   * This function can be applied on both batch and streaming Datasets. 
With a streaming dataset,
+   * this function will be once for each in every trigger. For each key, 
the updated state from the
+   * function call in a trigger will be the state available in the 
function call in the next
--- End diff --

Any updates to the state will be stored and passed to the user given 
function in subsequent batches when executed as a Streaming Query.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98778548
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
--- End diff --

Should this be in `streaming`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98779439
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -235,3 +240,86 @@ case class StateStoreSaveExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 }
+
+
+/**
+ * Physical operator for executing streaming mapGroupsWithState.
+ */
+case class MapGroupsWithStateExec(
+func: (Any, Iterator[Any], LogicalState[Any]) => Iterator[Any],
+keyDeserializer: Expression,   // probably not needed
+valueDeserializer: Expression,
+groupingAttributes: Seq[Attribute],
+dataAttributes: Seq[Attribute],
+outputObjAttr: Attribute,
+stateId: Option[OperatorStateId],
+stateDeserializer: Expression,
+stateSerializer: Seq[NamedExpression],
+child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with 
StateStoreWriter {
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))   // is this 
ordering needed?
+
+  override protected def doExecute(): RDD[InternalRow] = {
+
+child.execute().mapPartitionsWithStateStore[InternalRow](
--- End diff --

I don't think you need the type here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98777503
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalState
+
+/**
+ * :: Experimental ::
+ *
+ * Wrapper class for interacting with state data in `mapGroupsWithState` 
and
--- End diff --

I think we want to say what state we are talking about.  Something like 
"per-key state from previous invocations of the function in a StreamingQuery"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98780383
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala ---
@@ -144,6 +145,12 @@ object ObjectOperator {
 (i: InternalRow) => proj(i).get(0, deserializer.dataType)
   }
 
+  def deserializeRowToObject(
+deserializer: Expression): InternalRow => Any = {
--- End diff --

indent, also does this not fit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98779599
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -235,3 +240,86 @@ case class StateStoreSaveExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 }
+
+
+/**
+ * Physical operator for executing streaming mapGroupsWithState.
+ */
+case class MapGroupsWithStateExec(
+func: (Any, Iterator[Any], LogicalState[Any]) => Iterator[Any],
+keyDeserializer: Expression,   // probably not needed
+valueDeserializer: Expression,
+groupingAttributes: Seq[Attribute],
+dataAttributes: Seq[Attribute],
+outputObjAttr: Attribute,
+stateId: Option[OperatorStateId],
+stateDeserializer: Expression,
+stateSerializer: Seq[NamedExpression],
+child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with 
StateStoreWriter {
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))   // is this 
ordering needed?
--- End diff --

Yes, the GroupedIterator relies on sorting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98779015
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -90,6 +93,14 @@ class IncrementalExecution(
   keys,
   Some(stateId),
   child) :: Nil))
+  case MapGroupsWithStateExec(
+func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr,
--- End diff --

indent inconsistent with above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98775825
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
   }
 
   /**
+   * ::Experimental::
+   * (Scala-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
--- End diff --

while maintaining some user-defined state for each key.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98780164
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -184,7 +189,7 @@ case class StateStoreSaveExec(
 }
 
 // Assumption: Append mode can be done only when watermark has 
been specified
-store.remove(watermarkPredicate.get.eval)
+store.remove(watermarkPredicate.get.eval _)
--- End diff --

Why this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98778823
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.State
+
+/** Internal implementation of the [[State]] interface */
+private[sql] class StateImpl[S](optionalValue: Option[S]) extends State[S] 
{
+  private var value: S = optionalValue.getOrElse(null.asInstanceOf[S])
+  private var defined: Boolean = optionalValue.isDefined
+  private var updated: Boolean = false  // whether value has been updated 
(but not removed)
+  private var removed: Boolean = false  // whether value has eben removed
+
+  // = Public API =
+  override def exists: Boolean = {
+defined
+  }
+
+  override def get(): S = {
--- End diff --

no `()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98779142
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -54,6 +55,18 @@ trait StatefulOperator extends SparkPlan {
   }
 }
 
+trait StateStoreReader extends StatefulOperator {
--- End diff --

and lowercase if it contains multiple classes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98777860
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalState
+
+/**
+ * :: Experimental ::
+ *
+ * Wrapper class for interacting with state data in `mapGroupsWithState` 
and
+ * `flatMapGroupsWithState` operations on
+ * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]].
+ *
+ * @note Operations on state are not threadsafe.
+ *
+ * Scala example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *def mappingFunction(key: String, value: Iterable[Int], state: 
State[Int]): Option[String] = {
+ *  // Check if state exists
+ *  if (state.exists) {
+ *val existingState = state.get  // Get the existing state
+ *val shouldRemove = ... // Decide whether to remove the 
state
+ *if (shouldRemove) {
+ *  state.remove() // Remove the state
+ *} else {
+ *  val newState = ...
+ *  state.update(newState)// Set the new state
+ *}
+ *  } else {
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *  }
+ *  ... // return something
+ *}
+ *
+ * }}}
+ *
+ * Java example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *Function3, State, String> 
mappingFunction =
+ *   new Function3, State, 
String>() {
+ *
+ * @Override
+ * public String call(String key, Optional value, 
State state) {
+ *   if (state.exists()) {
+ * int existingState = state.get(); // Get the existing state
+ * boolean shouldRemove = ...; // Decide whether to remove the 
state
+ * if (shouldRemove) {
+ *   state.remove(); // Remove the state
+ * } else {
+ *   int newState = ...;
+ *   state.update(newState); // Set the new state
+ * }
+ *   } else {
+ * int initialState = ...; // Set the initial state
+ * state.update(initialState);
+ *   }
+ *   ... // return something
+ * }
+ *   };
+ * }}}
+ *
+ * @tparam S Type of the state
+ * @since 2.1.1
+ */
+@Experimental
+@InterfaceStability.Evolving
+trait State[S] extends LogicalState[S] {
+
+  def exists: Boolean
+
+  def get(): S
--- End diff --

No `()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98779264
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -54,6 +55,18 @@ trait StatefulOperator extends SparkPlan {
   }
 }
 
+trait StateStoreReader extends StatefulOperator {
+  override lazy val metrics = Map(
+"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
+}
+
+trait StateStoreWriter extends StatefulOperator {
--- End diff --

Scala doc for both of these.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98777585
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalState
+
+/**
+ * :: Experimental ::
+ *
+ * Wrapper class for interacting with state data in `mapGroupsWithState` 
and
+ * `flatMapGroupsWithState` operations on
+ * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]].
+ *
+ * @note Operations on state are not threadsafe.
+ *
+ * Scala example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *def mappingFunction(key: String, value: Iterable[Int], state: 
State[Int]): Option[String] = {
+ *  // Check if state exists
+ *  if (state.exists) {
+ *val existingState = state.get  // Get the existing state
+ *val shouldRemove = ... // Decide whether to remove the 
state
+ *if (shouldRemove) {
+ *  state.remove() // Remove the state
+ *} else {
+ *  val newState = ...
+ *  state.update(newState)// Set the new state
+ *}
+ *  } else {
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *  }
+ *  ... // return something
+ *}
+ *
+ * }}}
+ *
+ * Java example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *Function3, State, String> 
mappingFunction =
+ *   new Function3, State, 
String>() {
+ *
+ * @Override
+ * public String call(String key, Optional value, 
State state) {
+ *   if (state.exists()) {
+ * int existingState = state.get(); // Get the existing state
+ * boolean shouldRemove = ...; // Decide whether to remove the 
state
+ * if (shouldRemove) {
+ *   state.remove(); // Remove the state
+ * } else {
+ *   int newState = ...;
+ *   state.update(newState); // Set the new state
+ * }
+ *   } else {
+ * int initialState = ...; // Set the initial state
+ * state.update(initialState);
+ *   }
+ *   ... // return something
+ * }
+ *   };
+ * }}}
+ *
+ * @tparam S Type of the state
+ * @since 2.1.1
+ */
+@Experimental
+@InterfaceStability.Evolving
+trait State[S] extends LogicalState[S] {
--- End diff --

`KeyState`?  State just feels very generic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98778773
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.State
+
+/** Internal implementation of the [[State]] interface */
+private[sql] class StateImpl[S](optionalValue: Option[S]) extends State[S] 
{
--- End diff --

I would consider using `null` here to avoid extra allocations in the 
critical path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98776628
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
   }
 
   /**
+   * ::Experimental::
+   * (Scala-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
+   * For each unique group, the function will be passed the group key and 
an iterator that contains
+   * all of the elements in the group. The function can return an object 
of arbitrary type, and
+   * optionally update or remove the corresponding state. The returned 
object will form a new
+   * [[Dataset]].
+   *
+   * This function can be applied on both batch and streaming Datasets. 
With a streaming dataset,
+   * this function will be once for each in every trigger. For each key, 
the updated state from the
+   * function call in a trigger will be the state available in the 
function call in the next
+   * trigger. However, for batch, `mapGroupsWithState` behaves exactly as 
`mapGroups` and the
+   * function is called only once per key without any prior state.
+   *
+   * There is no guaranteed ordering of values in the iterator in the 
function.
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @see [[State]] for more details of how to update/remove state in the 
function.
+   * @since 2.1.1
+   */
+  @Experimental
+  @InterfaceStability.Evolving
+  def mapGroupsWithState[STATE: Encoder, OUT: Encoder](
--- End diff --

I think it would be useful to come up with an example use case since this 
is pretty complicated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98777095
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
   }
 
   /**
+   * ::Experimental::
+   * (Scala-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
+   * For each unique group, the function will be passed the group key and 
an iterator that contains
+   * all of the elements in the group. The function can return an object 
of arbitrary type, and
+   * optionally update or remove the corresponding state. The returned 
object will form a new
+   * [[Dataset]].
+   *
+   * This function can be applied on both batch and streaming Datasets. 
With a streaming dataset,
+   * this function will be once for each in every trigger. For each key, 
the updated state from the
+   * function call in a trigger will be the state available in the 
function call in the next
+   * trigger. However, for batch, `mapGroupsWithState` behaves exactly as 
`mapGroups` and the
+   * function is called only once per key without any prior state.
+   *
+   * There is no guaranteed ordering of values in the iterator in the 
function.
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @see [[State]] for more details of how to update/remove state in the 
function.
+   * @since 2.1.1
+   */
+  @Experimental
+  @InterfaceStability.Evolving
+  def mapGroupsWithState[STATE: Encoder, OUT: Encoder](
+  func: (K, Iterator[V], State[STATE]) => OUT): Dataset[OUT] = {
+val f = (key: K, it: Iterator[V], s: State[STATE]) => 
Iterator(func(key, it, s))
+flatMapGroupsWithState[STATE, OUT](f)
+  }
+
+  /**
+   * ::Experimental::
+   * (Java-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
+   * For each unique group, the function will be passed the group key and 
an iterator that contains
+   * all of the elements in the group. The function can return an object 
of arbitrary type, and
+   * optionally update or remove the corresponding state. The returned 
object will form a new
+   * [[Dataset]].
+   *
+   * This function can be applied on both batch and streaming Datasets. 
With a streaming dataset,
+   * this function will be once for each in every trigger. For each key, 
the updated state from the
+   * function call in a trigger will be the state available in the 
function call in the next
+   * trigger. However, for batch, `mapGroupsWithState` behaves exactly as 
`mapGroups` and the
+   * function is called only once per key without any prior state.
+   *
+   * There is no guaranteed ordering of values in the iterator in the 
function.
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @see [[State]] for more details of how to update/remove state in the 
function.
+   * @since 2.1.1
+   */
+  @Experimental
+  @InterfaceStability.Evolving
+  def mapGroupsWithState[STATE, OUT](
--- End diff --

Since this is pretty complicated and we might iterate over time, I wonder 
if we shouldn't just put the full explanation in `State` and link there with a 
very brief version for each function.  Not sure...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98776538
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
   }
 
   /**
+   * ::Experimental::
+   * (Scala-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
+   * For each unique group, the function will be passed the group key and 
an iterator that contains
+   * all of the elements in the group. The function can return an object 
of arbitrary type, and
+   * optionally update or remove the corresponding state. The returned 
object will form a new
+   * [[Dataset]].
+   *
+   * This function can be applied on both batch and streaming Datasets. 
With a streaming dataset,
+   * this function will be once for each in every trigger. For each key, 
the updated state from the
+   * function call in a trigger will be the state available in the 
function call in the next
+   * trigger. However, for batch, `mapGroupsWithState` behaves exactly as 
`mapGroups` and the
+   * function is called only once per key without any prior state.
+   *
+   * There is no guaranteed ordering of values in the iterator in the 
function.
--- End diff --

I'd maybe put these into bullets as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98778649
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/StateImpl.scala ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.State
+
+/** Internal implementation of the [[State]] interface */
+private[sql] class StateImpl[S](optionalValue: Option[S]) extends State[S] 
{
+  private var value: S = optionalValue.getOrElse(null.asInstanceOf[S])
+  private var defined: Boolean = optionalValue.isDefined
+  private var updated: Boolean = false  // whether value has been updated 
(but not removed)
+  private var removed: Boolean = false  // whether value has eben removed
--- End diff --

"been"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98776799
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
   }
 
   /**
+   * ::Experimental::
+   * (Scala-specific)
+   * Applies the given function to each group of data, while using an 
additional keyed state.
+   * For each unique group, the function will be passed the group key and 
an iterator that contains
+   * all of the elements in the group. The function can return an object 
of arbitrary type, and
+   * optionally update or remove the corresponding state. The returned 
object will form a new
+   * [[Dataset]].
+   *
+   * This function can be applied on both batch and streaming Datasets. 
With a streaming dataset,
+   * this function will be once for each in every trigger. For each key, 
the updated state from the
+   * function call in a trigger will be the state available in the 
function call in the next
+   * trigger. However, for batch, `mapGroupsWithState` behaves exactly as 
`mapGroups` and the
+   * function is called only once per key without any prior state.
+   *
+   * There is no guaranteed ordering of values in the iterator in the 
function.
+   *
+   * This function does not support partial aggregation, and as a result 
requires shuffling all
+   * the data in the [[Dataset]].
+   *
+   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
+   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
+   * constraints of their cluster.
+   *
+   * @see [[State]] for more details of how to update/remove state in the 
function.
+   * @since 2.1.1
+   */
+  @Experimental
+  @InterfaceStability.Evolving
+  def mapGroupsWithState[STATE: Encoder, OUT: Encoder](
--- End diff --

I would also stick with `S` and `U` to match other functions in the class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98777806
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalState
+
+/**
+ * :: Experimental ::
+ *
+ * Wrapper class for interacting with state data in `mapGroupsWithState` 
and
+ * `flatMapGroupsWithState` operations on
+ * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]].
+ *
+ * @note Operations on state are not threadsafe.
+ *
+ * Scala example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *def mappingFunction(key: String, value: Iterable[Int], state: 
State[Int]): Option[String] = {
+ *  // Check if state exists
+ *  if (state.exists) {
+ *val existingState = state.get  // Get the existing state
+ *val shouldRemove = ... // Decide whether to remove the 
state
+ *if (shouldRemove) {
+ *  state.remove() // Remove the state
+ *} else {
+ *  val newState = ...
+ *  state.update(newState)// Set the new state
+ *}
+ *  } else {
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *  }
+ *  ... // return something
+ *}
+ *
+ * }}}
+ *
+ * Java example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *Function3, State, String> 
mappingFunction =
+ *   new Function3, State, 
String>() {
+ *
+ * @Override
+ * public String call(String key, Optional value, 
State state) {
+ *   if (state.exists()) {
+ * int existingState = state.get(); // Get the existing state
+ * boolean shouldRemove = ...; // Decide whether to remove the 
state
+ * if (shouldRemove) {
+ *   state.remove(); // Remove the state
+ * } else {
+ *   int newState = ...;
+ *   state.update(newState); // Set the new state
+ * }
+ *   } else {
+ * int initialState = ...; // Set the initial state
+ * state.update(initialState);
+ *   }
+ *   ... // return something
+ * }
+ *   };
+ * }}}
+ *
+ * @tparam S Type of the state
--- End diff --

A user defined type that can be stored for each key.  Must be encodable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98777964
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/State.scala ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalState
+
+/**
+ * :: Experimental ::
+ *
+ * Wrapper class for interacting with state data in `mapGroupsWithState` 
and
+ * `flatMapGroupsWithState` operations on
+ * [[org.apache.spark.sql.KeyValueGroupedDataset KeyValueGroupedDataset]].
+ *
+ * @note Operations on state are not threadsafe.
+ *
+ * Scala example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *def mappingFunction(key: String, value: Iterable[Int], state: 
State[Int]): Option[String] = {
+ *  // Check if state exists
+ *  if (state.exists) {
+ *val existingState = state.get  // Get the existing state
+ *val shouldRemove = ... // Decide whether to remove the 
state
+ *if (shouldRemove) {
+ *  state.remove() // Remove the state
+ *} else {
+ *  val newState = ...
+ *  state.update(newState)// Set the new state
+ *}
+ *  } else {
+ *val initialState = ...
+ *state.update(initialState)  // Set the initial state
+ *  }
+ *  ... // return something
+ *}
+ *
+ * }}}
+ *
+ * Java example of using `State`:
+ * {{{
+ *// A mapping function that maintains an integer state for string 
keys and returns a string.
+ *Function3, State, String> 
mappingFunction =
+ *   new Function3, State, 
String>() {
+ *
+ * @Override
+ * public String call(String key, Optional value, 
State state) {
+ *   if (state.exists()) {
+ * int existingState = state.get(); // Get the existing state
+ * boolean shouldRemove = ...; // Decide whether to remove the 
state
+ * if (shouldRemove) {
+ *   state.remove(); // Remove the state
+ * } else {
+ *   int newState = ...;
+ *   state.update(newState); // Set the new state
+ * }
+ *   } else {
+ * int initialState = ...; // Set the initial state
+ * state.update(initialState);
+ *   }
+ *   ... // return something
+ * }
+ *   };
+ * }}}
+ *
+ * @tparam S Type of the state
+ * @since 2.1.1
+ */
+@Experimental
+@InterfaceStability.Evolving
+trait State[S] extends LogicalState[S] {
+
+  def exists: Boolean
+
+  def get(): S
+
+  def update(newState: S): Unit
+
+  def remove(): Unit
+
+  @inline final def getOption(): Option[S] = if (exists) Some(get()) else 
None
--- End diff --

No `()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14725: [SPARK-17161] [PYSPARK][ML] Add PySpark-ML JavaWr...

2017-01-31 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/14725#discussion_r98780626
  
--- Diff: python/pyspark/ml/wrapper.py ---
@@ -16,6 +16,10 @@
 #
 
 from abc import ABCMeta, abstractmethod
+import sys
--- End diff --

That looks fine, we use xrange similarly elsewhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16722: [SPARK-9478][ML][MLlib] Add sample weights to dec...

2017-01-31 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/16722#discussion_r98778763
  
--- Diff: 
mllib-local/src/test/scala/org/apache/spark/ml/util/TestingUtils.scala ---
@@ -48,7 +48,7 @@ object TestingUtils {
   /**
* Private helper function for comparing two values using absolute 
tolerance.
*/
-  private def AbsoluteErrorComparison(x: Double, y: Double, eps: Double): 
Boolean = {
+  private[ml] def AbsoluteErrorComparison(x: Double, y: Double, eps: 
Double): Boolean = {
--- End diff --

It's not used. I just changed the scope of both methods, I can change it 
back of course. I don't see a great reason to make this public since most users 
will use `relTol` instead. I'm open to other opinions though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16696: [SPARK-19350] [SQL] Cardinality estimation of Lim...

2017-01-31 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16696#discussion_r98776491
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -727,37 +728,18 @@ case class GlobalLimit(limitExpr: Expression, child: 
LogicalPlan) extends UnaryN
   }
   override def computeStats(conf: CatalystConf): Statistics = {
 val limit = limitExpr.eval().asInstanceOf[Int]
-val sizeInBytes = if (limit == 0) {
-  // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also 
be zero
-  // (product of children).
-  1
-} else {
-  (limit: Long) * output.map(a => a.dataType.defaultSize).sum
-}
-child.stats(conf).copy(sizeInBytes = sizeInBytes)
+val childStats = child.stats(conf)
+// Don't propagate column stats, because we don't know the 
distribution after a limit operation
+Statistics(
+  sizeInBytes = EstimationUtils.getOutputSize(output, limit, 
childStats.attributeStats),
--- End diff --

Agreed.  We can pick the smaller value between the child node's row count 
and the limit number. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16666: [SPARK-19319][SparkR]:SparkR Kmeans summary returns erro...

2017-01-31 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/1
  
**[Test build #72207 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72207/testReport)**
 for PR 1 at commit 
[`2110536`](https://github.com/apache/spark/commit/2110536cab25ff690bf60ad28d08d69cd648e97c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16666: [SPARK-19319][SparkR]:SparkR Kmeans summary returns erro...

2017-01-31 Thread wangmiao1981
Github user wangmiao1981 commented on the issue:

https://github.com/apache/spark/pull/1
  
ping @felixcheung 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...

2017-01-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16757
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...

2017-01-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16757
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72203/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...

2017-01-31 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16757
  
**[Test build #72203 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72203/testReport)**
 for PR 16757 at commit 
[`6aad5d8`](https://github.com/apache/spark/commit/6aad5d844b26f675cf13d3c898da785271ffcc7c).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16730: [SPARK-19395][SparkR]Convert coefficients in summ...

2017-01-31 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16730


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16730: [SPARK-19395][SparkR]Convert coefficients in summary to ...

2017-01-31 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/16730
  
merged to master, thanks!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16740: [SPARK-19400][ML] Allow GLM to handle intercept only mod...

2017-01-31 Thread sethah
Github user sethah commented on the issue:

https://github.com/apache/spark/pull/16740
  
Allowing offset will only require a small change to the intercept 
calculation, won't it?

scala
 val agg = data.agg(sum(w * (col("label") - col("label"))), sum(w)).first()
link.link(agg.getDouble(0) / agg.getDouble(1))

I'm still in favor of fixing the IRLS bug, but we should be able to return 
an analytic result without too much trouble, unless I'm missing something.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16729: [SPARK-19391][SparkR][ML] Tweedie GLM API for Spa...

2017-01-31 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/16729#discussion_r98760363
  
--- Diff: R/pkg/R/mllib_regression.R ---
@@ -53,12 +53,19 @@ setClass("IsotonicRegressionModel", representation(jobj 
= "jobj"))
 #'   the result of a call to a family function. Refer R family 
at
 #'   
\url{https://stat.ethz.ch/R-manual/R-devel/library/stats/html/family.html}.
 #'   Currently these families are supported: \code{binomial}, 
\code{gaussian},
-#'   \code{Gamma}, and \code{poisson}.
+#'   \code{Gamma}, \code{poisson} and \code{"tweedie"}.
+#'   The tweedie family must be specified using character 
string \code{"tweedie"}. The family
+#'   function \code{tweedie} in the \code{statmod} package is 
not supported.
--- End diff --

I prefer not having hard dependency on an external package in R, but if 
statmod is present, could we support `family = tweedie` (not string) too?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98749100
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 ---
@@ -135,7 +136,24 @@ private[kafka010] class KafkaSourceRDD(
 } else {
   new NextIterator[ConsumerRecord[Array[Byte], Array[Byte]]]() {
 val consumer = CachedKafkaConsumer.getOrCreate(
-  range.topic, range.partition, executorKafkaParams)
+range.topic, range.partition, executorKafkaParams, 
reuseKafkaConsumer)
+if (range.fromOffset < 0 || range.untilOffset < 0) {
+  // Late bind the offset range
+  val fromOffset = if (range.fromOffset < 0) {
+
consumer.rawConsumer.seekToBeginning(ju.Arrays.asList(range.topicPartition))
+consumer.rawConsumer.position(range.topicPartition)
+  } else {
+range.fromOffset
+  }
+  val untilOffset = if (range.untilOffset < 0) {
+
consumer.rawConsumer.seekToEnd(ju.Arrays.asList(range.topicPartition))
--- End diff --

nit: add assert(range.fromOffset == -1) to avoid breaking it in future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98746133
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
@@ -249,6 +249,24 @@ class KafkaTestUtils extends Logging {
 offsets
   }
 
+  def cleanupLogs(): Unit = {
+server.logManager.cleanupLogs()
+  }
+
+  def getEarliestOffsets(topics: Set[String]): Map[TopicPartition, Long] = 
{
+val kc = new KafkaConsumer[String, String](consumerConfiguration)
+logInfo("Created consumer to get latest offsets")
--- End diff --

nit: please fix the log


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98746146
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
@@ -249,6 +249,24 @@ class KafkaTestUtils extends Logging {
 offsets
   }
 
+  def cleanupLogs(): Unit = {
+server.logManager.cleanupLogs()
+  }
+
+  def getEarliestOffsets(topics: Set[String]): Map[TopicPartition, Long] = 
{
+val kc = new KafkaConsumer[String, String](consumerConfiguration)
+logInfo("Created consumer to get latest offsets")
+kc.subscribe(topics.asJavaCollection)
+kc.poll(0)
+val partitions = kc.assignment()
+kc.pause(partitions)
+kc.seekToBeginning(partitions)
+val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap
+kc.close()
+logInfo("Closed consumer to get latest offsets")
--- End diff --

nit: please fix the log


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98746070
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
@@ -274,6 +292,11 @@ class KafkaTestUtils extends Logging {
 props.put("log.flush.interval.messages", "1")
 props.put("replica.socket.timeout.ms", "1500")
 props.put("delete.topic.enable", "true")
+withBrokerProps.map { p =>
--- End diff --

nit: you can change the type of `withBrokerProps` to `Map[String, Object]`. 
Then here you can just use `props.putAll(withBrokerProps.asJava)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98756883
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -108,7 +113,14 @@ private[kafka010] class KafkaOffsetReaderImpl(
 
   def close(): Unit = consumer.close()
 
-  def fetchSpecificStartingOffsets(
+  override def fetchTopicPartitions(): Set[TopicPartition] = {
+assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+// Poll to get the latest assigned partitions
+consumer.poll(0)
+consumer.assignment().asScala.toSet
--- End diff --

nit: please also call `pause` like this to avoid fetching the real data 
when reusing the relation.
```
val partitions = consumer.assignment()
consumer.pause(partitions)
partitions.asScala.toSet
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98749048
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 ---
@@ -135,7 +136,24 @@ private[kafka010] class KafkaSourceRDD(
 } else {
   new NextIterator[ConsumerRecord[Array[Byte], Array[Byte]]]() {
 val consumer = CachedKafkaConsumer.getOrCreate(
-  range.topic, range.partition, executorKafkaParams)
+range.topic, range.partition, executorKafkaParams, 
reuseKafkaConsumer)
+if (range.fromOffset < 0 || range.untilOffset < 0) {
+  // Late bind the offset range
+  val fromOffset = if (range.fromOffset < 0) {
+
consumer.rawConsumer.seekToBeginning(ju.Arrays.asList(range.topicPartition))
--- End diff --

nit: add `assert(range.fromOffset == -2)` to avoid breaking it in future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

2017-01-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16758
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

2017-01-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16758
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72206/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

2017-01-31 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16758
  
**[Test build #72206 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72206/testReport)**
 for PR 16758 at commit 
[`8be63de`](https://github.com/apache/spark/commit/8be63de507f9e4fc258aca1d467509e06b674565).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

2017-01-31 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16758
  
**[Test build #72206 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72206/testReport)**
 for PR 16758 at commit 
[`8be63de`](https://github.com/apache/spark/commit/8be63de507f9e4fc258aca1d467509e06b674565).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16751: [SPARK-19409][BUILD] Bump parquet version to 1.8.2

2017-01-31 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/16751
  
can you put rest of the cleanups in one place?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2017-01-31 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r98751522
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -933,6 +936,47 @@ object CombineLimits extends Rule[LogicalPlan] {
 }
 
 /**
+ * Check if there any cartesian products between joins of any type in the 
optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit 
cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is 
true.
+ *
+ * This rule must be run AFTER the ReorderJoin rule since the join 
conditions for each join must be
+ * collected before checking if it is a cartesian product. If you have
+ * SELECT * from R, S where R.r = S.s,
+ * the join between R and S is not a cartesian product and therefore 
should be allowed.
+ * The predicate R.r = S.s is not recognized as a join condition until the 
ReorderJoin rule.
+ */
+case class CheckCartesianProducts(conf: CatalystConf)
--- End diff --

If we do it in the optimizer, it does not cover all the cases, right? 
Cartesian Product might be chosen based on the statistics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2017-01-31 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r98751322
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -933,6 +936,47 @@ object CombineLimits extends Rule[LogicalPlan] {
 }
 
 /**
+ * Check if there any cartesian products between joins of any type in the 
optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit 
cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is 
true.
+ *
+ * This rule must be run AFTER the ReorderJoin rule since the join 
conditions for each join must be
+ * collected before checking if it is a cartesian product. If you have
+ * SELECT * from R, S where R.r = S.s,
+ * the join between R and S is not a cartesian product and therefore 
should be allowed.
+ * The predicate R.r = S.s is not recognized as a join condition until the 
ReorderJoin rule.
+ */
+case class CheckCartesianProducts(conf: CatalystConf)
--- End diff --

If we do it in the optimizer, it does not cover all the cases, right? 
Cartesian Product might be chosen based on the statistics.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2017-01-31 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r98750834
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -933,6 +936,47 @@ object CombineLimits extends Rule[LogicalPlan] {
 }
 
 /**
+ * Check if there any cartesian products between joins of any type in the 
optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit 
cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is 
true.
+ *
+ * This rule must be run AFTER the ReorderJoin rule since the join 
conditions for each join must be
+ * collected before checking if it is a cartesian product. If you have
+ * SELECT * from R, S where R.r = S.s,
+ * the join between R and S is not a cartesian product and therefore 
should be allowed.
+ * The predicate R.r = S.s is not recognized as a join condition until the 
ReorderJoin rule.
+ */
+case class CheckCartesianProducts(conf: CatalystConf)
--- End diff --

If we do it here, it does not cover all the cases, right? `Cartesian 
Product` might be chosen based on the statistics. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16746: [SPARK-15648][SQL] Add teradataDialect for JDBC c...

2017-01-31 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/16746#discussion_r98751002
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Types
+import org.apache.spark.sql.types._
+
+
+private case object TeradataDialect extends JdbcDialect {
+
+  override def canHandle(url: String): Boolean = { 
url.startsWith("jdbc:teradata") }
+
+  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+case StringType => Some(JdbcType("VARCHAR(255)", 
java.sql.Types.VARCHAR))
+case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
+case _ => None
+  }
--- End diff --

What about `isCascadingTruncateTable`? Could you check if Teradata does 
truncate cascadingly by default for `TRUNCATE TABLE` statement?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

2017-01-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16758#discussion_r98748723
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -54,6 +55,18 @@ trait StatefulOperator extends SparkPlan {
   }
 }
 
+trait StateStoreReader extends StatefulOperator {
--- End diff --

This files should probably be renamed from StatefulAggregation to 
StatefulOperations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16746: [SPARK-15648][SQL] Add teradataDialect for JDBC c...

2017-01-31 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/16746#discussion_r98750209
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Types
+import org.apache.spark.sql.types._
+
+
+private case object TeradataDialect extends JdbcDialect {
+
+  override def canHandle(url: String): Boolean = { 
url.startsWith("jdbc:teradata") }
+
+  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+case StringType => Some(JdbcType("VARCHAR(255)", 
java.sql.Types.VARCHAR))
+case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
+case _ => None
+  }
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15237: [SPARK-17663] [CORE] SchedulableBuilder should handle in...

2017-01-31 Thread erenavsarogullari
Github user erenavsarogullari commented on the issue:

https://github.com/apache/spark/pull/15237
  
Hi @squito,

Sorry, i am quite busy this week and happy to be merged this PR now if it 
is also ok for you. I plan to address fileName logging via separated jira.

Also To inform user for the following cases can be useful by adding logging:
- If `schedulerAllocFile` property is not set and `DEFAULT_SCHEDULER_FILE` 
is used,
- If `schedulerAllocFile` property is not set and `DEFAULT_SCHEDULER_FILE` 
do not exist,
- If `schedulerAllocFile` does not exist.

So thinking to create a single Jira for `FairSchedulableBuilder` logging 
improvement by covering both fileName logging(if file is found successfully) 
and above failure cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

2017-01-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16758
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72205/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

2017-01-31 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16758
  
**[Test build #72205 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72205/testReport)**
 for PR 16758 at commit 
[`6fab7a5`](https://github.com/apache/spark/commit/6fab7a5fde75309198d1e73e66948d82d0f590e6).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

2017-01-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16758
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16690: [SPARK-19347] ReceiverSupervisorImpl can add bloc...

2017-01-31 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16690#discussion_r98747972
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala ---
@@ -75,10 +106,11 @@ private[spark] abstract class RpcEndpointRef(conf: 
SparkConf)
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
+  @deprecated("use 'askSync' instead.", "2.1.0")
--- End diff --

2.2.0


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16690: [SPARK-19347] ReceiverSupervisorImpl can add bloc...

2017-01-31 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16690#discussion_r98748039
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala ---
@@ -19,6 +19,7 @@ package org.apache.spark.rpc
 
 import scala.concurrent.Future
 import scala.reflect.ClassTag
+import scala.util.control.NonFatal
--- End diff --

not used


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16690: [SPARK-19347] ReceiverSupervisorImpl can add bloc...

2017-01-31 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16690#discussion_r98747902
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala ---
@@ -91,6 +123,7 @@ private[spark] abstract class RpcEndpointRef(conf: 
SparkConf)
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
+  @deprecated("use 'askSync' instead.", "2.1.0")
--- End diff --

2.2.0


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16751: [SPARK-19409][BUILD] Bump parquet version to 1.8.2

2017-01-31 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/16751
  
Ya, @mallman .

However, with the same reason, I conclude to put them away from here. 
Exactly, the opposite direction of your opinion. If we try to fix all of them 
in a single shot, it will not merged for a long time.

At least, you can see #16756 starts already. I think those workarounds are 
going to be cleaned up soon if this commits are not reverted for some reason. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16758: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

2017-01-31 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16758
  
**[Test build #72205 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72205/testReport)**
 for PR 16758 at commit 
[`6fab7a5`](https://github.com/apache/spark/commit/6fab7a5fde75309198d1e73e66948d82d0f590e6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16758: Arbitrary stateful operations with MapGroupsWithS...

2017-01-31 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/16758

Arbitrary stateful operations with MapGroupsWithState

## What changes were proposed in this pull request?

`mapGroupsWithState` is a new API for arbitrary stateful operations in 
Structured Streaming, similar to `DStream.mapWithState` 

*Requirements*
- Users should be able to specify a function that can do the following
- Access the input row corresponding to a key
- Access the previous state corresponding to a key
- Optionally, update or remove the state
- Output any number of new rows (or none at all)

*Proposed API*
```
//  New methods on KeyValueGroupedDataset 
class KeyValueGroupedDataset[K, V] {
// Scala friendly
def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], 
State[S]) => U)
def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, 
Iterator[V], State[S]) => Iterator[U])
// Java friendly
   def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, 
S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
   def flatMapGroupsWithState[S, U](func: 
FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], 
resultEncoder: Encoder[U])
}

// --- New Java-friendly function classes 
--- 
public interface MapGroupsWithStateFunction extends 
Serializable {
  R call(K key, Iterator values, state: State) throws Exception;
}
public interface FlatMapGroupsWithStateFunction extends 
Serializable {
  Iterator call(K key, Iterator values, state: State) throws 
Exception;
}

// -- Wrapper class for state data 
-- 
trait State[S] {
def exists(): Boolean   
def get(): S// throws Exception is state does not 
exist
def getOption(): Option[S]   
def update(newState: S): Unit
def remove(): Unit  // exists() will be false after this
}
```

Key Semantics of the State class
- The state can be null.
- If the state.remove() is called, then state.exists() will return false, 
and getOption will returm None.
- After that state.update(newState) is called, then state.exists() will 
return true, and getOption will return Some(...). 
- None of the operations are thread-safe. This is to avoid memory barriers.

*Usage*
```
val stateFunc = (word: String, words: Iterator[String, runningCount: 
State[Long]) => {
val newCount = words.size + runningCount.getOption.getOrElse(0L)
runningCount.update(newCount)
   (word, newCount)
}

dataset // type 
is Dataset[String]
  .groupByKey[String](w => w)   // generates 
KeyValueGroupedDataset[String, String]
  .mapGroupsWithState[Long, (String, Long)](stateFunc)  // returns 
Dataset[(String, Long)]
```

## How was this patch tested?
New unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark mapWithState

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16758


commit d10da2700a970c82537b678db34b2d80cebebcc8
Author: Tathagata Das 
Date:   2017-01-18T00:45:54Z

Prototype - almost working

commit 78cd1853033a091b62cb350879bb6c3a0b6c8641
Author: Tathagata Das 
Date:   2017-01-18T01:05:51Z

Renamed to mapGroupsWithState

commit 0c22e08a8f9ad66a49bc939652fee14577f9bd4b
Author: Tathagata Das 
Date:   2017-01-18T01:56:07Z

Fixed bugs

commit 52e14e479ffa1e38c9efc5b063a95831caab6997
Author: Tathagata Das 
Date:   2017-01-18T10:52:20Z

Removed prints

commit 529aefe6d7cb9cd54e20c0cdaa11cec90a4f16be
Author: Tathagata Das 
Date:   2017-01-18T10:58:27Z

Test state remove

commit 57f5e8d2e8a74a8269667a9d4d89971eb9107c07
Author: Tathagata Das 
Date:   2017-01-18T20:26:53Z

Test restart, and test with metrics

commit 3e0d8dcfa81d58ae6ca6754cc54c19383179802a
Author: Tathagata Das 
Date:   2017-01-21T02:25:29Z

Fixed everything

commit b54fa230eda141316713e3b1d1c56d8a28fd3a6c
Author: Tathagata Das 
Date:   2017-01-30T02:06:00Z

Refactored, added java APIs and tests

commit ab3cb6c961f0d861a24c2146dcb9dc0380c8adc9
Author: Tathagata Das 
Date:   2017-01-30T05:07:19Z

Refactored

commit ddf4550b765af89a4ed7d80edabfe3370cbd1e23
Author: Tathagata Das 
Date:   2017-01-30T21:29:44Z

Added more test

commit 3133f83195ca562e866f6565cdfbd59e118cb6aa
Author: Tathagata Das 
Date:   2017-01-31T18:53:58Z

Merge remote-tracking bran

[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-31 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r98743557
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +148,53 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+TreeMap> sortedConsumers =
+  new TreeMap>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+long key = c.getUsed();
+List list = null;
--- End diff --

```
list = sorted.get(key);
if (list == null) {
  // instantiate, add to map
}
list.add(...);
```

This is slightly cheaper and shorter than your code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...

2017-01-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15009
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72204/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-31 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r98744044
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +148,53 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+TreeMap> sortedConsumers =
+  new TreeMap>();
 for (MemoryConsumer c: consumers) {
   if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-try {
-  long released = c.spill(required - got, consumer);
-  if (released > 0) {
-logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
-  Utils.bytesToString(released), c, consumer);
-got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
-if (got >= required) {
-  break;
-}
+long key = c.getUsed();
+List list = null;
+if (sortedConsumers.containsKey(key)) {
+  list = sortedConsumers.get(key);
+  list.add(c);
+} else {
+  list = new ArrayList(1);
+  list.add(c);
+  sortedConsumers.put(key, list);
+}
+  }
+}
+while (!sortedConsumers.isEmpty()) {
+  // Get the consumer using the least memory more than the 
remaining required memory.
+  Map.Entry> currentEntry =
+sortedConsumers.ceilingEntry(required - got);
+  // No consumer has used memory more than the remaining required 
memory.
+  // Get the consumer of largest used memory.
+  if (currentEntry == null) {
+currentEntry = sortedConsumers.lastEntry();
+  }
+  List cList = currentEntry.getValue();
+  MemoryConsumer c = cList.remove(cList.size() - 1);
+  if (cList.size() == 0) {
--- End diff --

`cList.isEmpty()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...

2017-01-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15009
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16603: [SPARK-19244][Core] Sort MemoryConsumers accordin...

2017-01-31 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16603#discussion_r98743237
  
--- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
@@ -144,23 +148,53 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
   // spilling, avoid to have too many spilled files.
   if (got < required) {
 // Call spill() on other consumers to release memory
+// Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
+// which is just spilled in last few times and re-spilling on it 
will produce many small
+// spill files.
+TreeMap> sortedConsumers =
+  new TreeMap>();
--- End diff --

nit: you could just have used `new TreeMap<>()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...

2017-01-31 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15009
  
**[Test build #72204 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72204/testReport)**
 for PR 15009 at commit 
[`2fdcec9`](https://github.com/apache/spark/commit/2fdcec97f2455983684af8a2e6bcbe4214991972).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...

2017-01-31 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15009
  
**[Test build #72204 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72204/testReport)**
 for PR 15009 at commit 
[`2fdcec9`](https://github.com/apache/spark/commit/2fdcec97f2455983684af8a2e6bcbe4214991972).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...

2017-01-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15009
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72201/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...

2017-01-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15009
  
Build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...

2017-01-31 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15009
  
**[Test build #72201 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72201/consoleFull)**
 for PR 15009 at commit 
[`6a7ba5b`](https://github.com/apache/spark/commit/6a7ba5bfdd2cb165956992907f681ab3ad85154e).
 * This patch passes all tests.
 * This patch **does not merge cleanly**.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16751: [SPARK-19409][BUILD] Bump parquet version to 1.8.2

2017-01-31 Thread mallman
Github user mallman commented on the issue:

https://github.com/apache/spark/pull/16751
  
FYI, there are at least two workarounds in the Spark codebase which can 
potentially be removed as a consequence of this upgrade. For example:


https://github.com/apache/spark/blob/5de1737b02710e36f6804d2ae243d1aeb30a0b32/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala#L549-L558

and


https://github.com/apache/spark/blob/ca6391637212814b7c0bd14c434a6737da17b258/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala#L175-L178

These come immediately to mind. There may be others.

I think this PR would have been a good opportunity to remove these 
workarounds, but it's been closed and merged so that's water under the bridge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...

2017-01-31 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/16757
  
cc @cloud-fan @windpiger @sameeragarwal 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14725: [SPARK-17161] [PYSPARK][ML] Add PySpark-ML JavaWr...

2017-01-31 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/14725#discussion_r98736777
  
--- Diff: python/pyspark/ml/wrapper.py ---
@@ -16,6 +16,10 @@
 #
 
 from abc import ABCMeta, abstractmethod
+import sys
--- End diff --

Does this look ok now @holdenk?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...

2017-01-31 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16757
  
**[Test build #72203 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72203/testReport)**
 for PR 16757 at commit 
[`6aad5d8`](https://github.com/apache/spark/commit/6aad5d844b26f675cf13d3c898da785271ffcc7c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15120: [SPARK-4563][core] Allow driver to advertise a di...

2017-01-31 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15120#discussion_r98734873
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala ---
@@ -66,7 +66,7 @@ private[spark] class SparkConfigProvider(conf: 
JMap[String, String]) extends Con
 findEntry(key) match {
   case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
   case e: ConfigEntryWithDefaultString[_] => 
Option(e.defaultValueString)
-  case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
+  case e: FallbackConfigEntry[_] => get(e.fallback.key)
--- End diff --

What issue? The code you're commenting on does not exist in 1.6. If you're 
having issues, please ask questions on the mailing lists or use the bug tracker.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...

2017-01-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16757
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72202/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...

2017-01-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16757
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...

2017-01-31 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16757
  
**[Test build #72202 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72202/testReport)**
 for PR 16757 at commit 
[`dac7ec9`](https://github.com/apache/spark/commit/dac7ec99075ce98ebea92e108ad66b05537de396).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...

2017-01-31 Thread budde
Github user budde commented on the issue:

https://github.com/apache/spark/pull/16744
  
Pinging @tdas on this-- looks like you're the committer who has contributed 
the most to kinesis-asl.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16281: [SPARK-13127][SQL] Update Parquet to 1.9.0

2017-01-31 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/16281
  
Thank you for sharing that information, @mallman .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16735: [SPARK-19228][SQL] Introduce tryParseDate method ...

2017-01-31 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16735#discussion_r98729386
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
 ---
@@ -140,12 +137,21 @@ private[csv] object CSVInferSchema {
 }
   }
 
+  private def tryParseDate(field: String, options: CSVOptions): DataType = 
{
+// This case infers a custom `dateFormat` is set.
+if ((allCatch opt options.dateFormat.parse(field)).isDefined) {
+  DateType
+} else {
+  tryParseTimestamp(field, options)
+}
+  }
+
   private def tryParseTimestamp(field: String, options: CSVOptions): 
DataType = {
-// This case infers a custom `dataFormat` is set.
+// This case infers a custom `timestampFormat` is set.
 if ((allCatch opt options.timestampFormat.parse(field)).isDefined) {
   TimestampType
 } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) 
{
-  // We keep this for backwords competibility.
+  // We keep this for backwards compatibility.
   TimestampType
 } else {
   tryParseBoolean(field, options)
--- End diff --

@HyukjinKwon That does not work correctly, if we put the change there. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16281: [SPARK-13127][SQL] Update Parquet to 1.9.0

2017-01-31 Thread mallman
Github user mallman commented on the issue:

https://github.com/apache/spark/pull/16281
  
FYI, we've been using 1.9.0 patched with a fix for 
https://issues.apache.org/jira/browse/PARQUET-783 without problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16757: [SPARK-18609][SQL] Fix redundant Alias removal in the op...

2017-01-31 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16757
  
**[Test build #72202 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72202/testReport)**
 for PR 16757 at commit 
[`dac7ec9`](https://github.com/apache/spark/commit/dac7ec99075ce98ebea92e108ad66b05537de396).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   >