[GitHub] spark pull request #20915: [SPARK-23803][SQL] Support bucket pruning

2018-06-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20915#discussion_r192953873
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---
@@ -83,39 +88,43 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
   // To verify if the bucket pruning works, this function checks two 
conditions:
   //   1) Check if the pruned buckets (before filtering) are empty.
   //   2) Verify the final result is the same as the expected one
-  private def checkPrunedAnswers(
-  bucketSpec: BucketSpec,
-  bucketValues: Seq[Integer],
-  filterCondition: Column,
-  originalDataFrame: DataFrame): Unit = {
+  private def checkPrunedAnswers(bucketSpec: BucketSpec,
--- End diff --

nit: code style


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20915: [SPARK-23803][SQL] Support bucket pruning

2018-06-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20915#discussion_r192953858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -79,6 +158,13 @@ object FileSourceStrategy extends Strategy with Logging 
{
 
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
   logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
+  val bucketSpec: Option[BucketSpec] = fsRelation.bucketSpec
+  val bucketSet = if (shouldPruneBuckets(bucketSpec)) {
+getBuckets(normalizedFilters, bucketSpec.get)
--- End diff --

nit: maybe better to call it `genBucketSet` instead of `getBuckets`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20915: [SPARK-23803][SQL] Support bucket pruning

2018-06-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20915#discussion_r192953694
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -50,6 +51,84 @@ import org.apache.spark.sql.execution.SparkPlan
  * and add it.  Proceed to the next file.
  */
 object FileSourceStrategy extends Strategy with Logging {
+
+  // should prune buckets iff num buckets is greater than 1 and there is 
only one bucket column
+  private def shouldPruneBuckets(bucketSpec: Option[BucketSpec]): Boolean 
= {
+bucketSpec match {
+  case Some(spec) => spec.bucketColumnNames.length == 1 && 
spec.numBuckets > 1
+  case None => false
+}
+  }
+
+  private def getExpressionBuckets(expr: Expression,
+   bucketColumnName: String,
+   numBuckets: Int): BitSet = {
+
+def getMatchedBucketBitSet(attr: Attribute, v: Any): BitSet = {
+  val matchedBuckets = new BitSet(numBuckets)
+  matchedBuckets.set(BucketingUtils.getBucketIdFromValue(attr, 
numBuckets, v))
+  matchedBuckets
+}
+
+expr match {
+  case expressions.EqualTo(a: Attribute, Literal(v, _)) if a.name == 
bucketColumnName =>
+getMatchedBucketBitSet(a, v)
+  case expressions.EqualTo(Literal(v, _), a: Attribute) if a.name == 
bucketColumnName =>
+getMatchedBucketBitSet(a, v)
+  case expressions.EqualNullSafe(a: Attribute, Literal(v, _)) if 
a.name == bucketColumnName =>
+getMatchedBucketBitSet(a, v)
+  case expressions.EqualNullSafe(Literal(v, _), a: Attribute) if 
a.name == bucketColumnName =>
+getMatchedBucketBitSet(a, v)
+  case expressions.In(a: Attribute, list)
+if list.forall(_.isInstanceOf[Literal]) && a.name == 
bucketColumnName =>
+val valuesSet = list.map(e => e.eval(EmptyRow))
+valuesSet
+  .map(v => getMatchedBucketBitSet(a, v))
+  .fold(new BitSet(numBuckets))(_ | _)
+  case expressions.IsNull(a: Attribute) if a.name == bucketColumnName 
=>
+getMatchedBucketBitSet(a, null)
+  case expressions.And(left, right) =>
+getExpressionBuckets(left, bucketColumnName, numBuckets) &
+  getExpressionBuckets(right, bucketColumnName, numBuckets)
+  case expressions.Or(left, right) =>
+getExpressionBuckets(left, bucketColumnName, numBuckets) |
+getExpressionBuckets(right, bucketColumnName, numBuckets)
+  case _ =>
+val matchedBuckets = new BitSet(numBuckets)
+matchedBuckets.setUntil(numBuckets)
+matchedBuckets
+}
+  }
+
+  private def getBuckets(normalizedFilters: Seq[Expression],
--- End diff --

nit: code style


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20915: [SPARK-23803][SQL] Support bucket pruning

2018-06-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20915#discussion_r192953586
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -50,6 +51,84 @@ import org.apache.spark.sql.execution.SparkPlan
  * and add it.  Proceed to the next file.
  */
 object FileSourceStrategy extends Strategy with Logging {
+
+  // should prune buckets iff num buckets is greater than 1 and there is 
only one bucket column
+  private def shouldPruneBuckets(bucketSpec: Option[BucketSpec]): Boolean 
= {
+bucketSpec match {
+  case Some(spec) => spec.bucketColumnNames.length == 1 && 
spec.numBuckets > 1
+  case None => false
+}
+  }
+
+  private def getExpressionBuckets(expr: Expression,
+   bucketColumnName: String,
+   numBuckets: Int): BitSet = {
+
+def getMatchedBucketBitSet(attr: Attribute, v: Any): BitSet = {
+  val matchedBuckets = new BitSet(numBuckets)
+  matchedBuckets.set(BucketingUtils.getBucketIdFromValue(attr, 
numBuckets, v))
+  matchedBuckets
+}
+
+expr match {
+  case expressions.EqualTo(a: Attribute, Literal(v, _)) if a.name == 
bucketColumnName =>
+getMatchedBucketBitSet(a, v)
+  case expressions.EqualTo(Literal(v, _), a: Attribute) if a.name == 
bucketColumnName =>
+getMatchedBucketBitSet(a, v)
+  case expressions.EqualNullSafe(a: Attribute, Literal(v, _)) if 
a.name == bucketColumnName =>
+getMatchedBucketBitSet(a, v)
+  case expressions.EqualNullSafe(Literal(v, _), a: Attribute) if 
a.name == bucketColumnName =>
+getMatchedBucketBitSet(a, v)
+  case expressions.In(a: Attribute, list)
+if list.forall(_.isInstanceOf[Literal]) && a.name == 
bucketColumnName =>
+val valuesSet = list.map(e => e.eval(EmptyRow))
+valuesSet
+  .map(v => getMatchedBucketBitSet(a, v))
+  .fold(new BitSet(numBuckets))(_ | _)
--- End diff --

can't we create one bit set for all the matched buckets, instead of 
creating many bit sets and merge them?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20915: [SPARK-23803][SQL] Support bucket pruning

2018-06-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20915#discussion_r192953456
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -50,6 +51,84 @@ import org.apache.spark.sql.execution.SparkPlan
  * and add it.  Proceed to the next file.
  */
 object FileSourceStrategy extends Strategy with Logging {
+
+  // should prune buckets iff num buckets is greater than 1 and there is 
only one bucket column
+  private def shouldPruneBuckets(bucketSpec: Option[BucketSpec]): Boolean 
= {
+bucketSpec match {
+  case Some(spec) => spec.bucketColumnNames.length == 1 && 
spec.numBuckets > 1
+  case None => false
+}
+  }
+
+  private def getExpressionBuckets(expr: Expression,
+   bucketColumnName: String,
+   numBuckets: Int): BitSet = {
+
+def getMatchedBucketBitSet(attr: Attribute, v: Any): BitSet = {
+  val matchedBuckets = new BitSet(numBuckets)
+  matchedBuckets.set(BucketingUtils.getBucketIdFromValue(attr, 
numBuckets, v))
+  matchedBuckets
+}
+
+expr match {
+  case expressions.EqualTo(a: Attribute, Literal(v, _)) if a.name == 
bucketColumnName =>
+getMatchedBucketBitSet(a, v)
+  case expressions.EqualTo(Literal(v, _), a: Attribute) if a.name == 
bucketColumnName =>
+getMatchedBucketBitSet(a, v)
+  case expressions.EqualNullSafe(a: Attribute, Literal(v, _)) if 
a.name == bucketColumnName =>
+getMatchedBucketBitSet(a, v)
+  case expressions.EqualNullSafe(Literal(v, _), a: Attribute) if 
a.name == bucketColumnName =>
+getMatchedBucketBitSet(a, v)
+  case expressions.In(a: Attribute, list)
--- End diff --

we should catch `InSet` as well


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20915: [SPARK-23803][SQL] Support bucket pruning

2018-06-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20915#discussion_r192953337
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -50,6 +51,84 @@ import org.apache.spark.sql.execution.SparkPlan
  * and add it.  Proceed to the next file.
  */
 object FileSourceStrategy extends Strategy with Logging {
+
+  // should prune buckets iff num buckets is greater than 1 and there is 
only one bucket column
+  private def shouldPruneBuckets(bucketSpec: Option[BucketSpec]): Boolean 
= {
+bucketSpec match {
+  case Some(spec) => spec.bucketColumnNames.length == 1 && 
spec.numBuckets > 1
+  case None => false
+}
+  }
+
+  private def getExpressionBuckets(expr: Expression,
+   bucketColumnName: String,
+   numBuckets: Int): BitSet = {
+
+def getMatchedBucketBitSet(attr: Attribute, v: Any): BitSet = {
+  val matchedBuckets = new BitSet(numBuckets)
+  matchedBuckets.set(BucketingUtils.getBucketIdFromValue(attr, 
numBuckets, v))
+  matchedBuckets
+}
+
+expr match {
+  case expressions.EqualTo(a: Attribute, Literal(v, _)) if a.name == 
bucketColumnName =>
--- End diff --

use `Equality` to match both `EqualTo` and `EqualNullSafe`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20915: [SPARK-23803][SQL] Support bucket pruning

2018-06-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20915#discussion_r192953104
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -50,6 +51,84 @@ import org.apache.spark.sql.execution.SparkPlan
  * and add it.  Proceed to the next file.
  */
 object FileSourceStrategy extends Strategy with Logging {
+
+  // should prune buckets iff num buckets is greater than 1 and there is 
only one bucket column
+  private def shouldPruneBuckets(bucketSpec: Option[BucketSpec]): Boolean 
= {
+bucketSpec match {
+  case Some(spec) => spec.bucketColumnNames.length == 1 && 
spec.numBuckets > 1
+  case None => false
+}
+  }
+
+  private def getExpressionBuckets(expr: Expression,
--- End diff --

nit: code style
```
def xxx(
para1: A,
para2: B): XXX
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r192949349
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(_, _), ArrayType(_, _)) =>
+TypeCheckResult.TypeCheckSuccess
+  case _ =>
+TypeCheckResult.TypeCheckFailure("The given two arguments should 
be an array")
+}
+  }
+
+  override def dataType: DataType = {
+MapType(
+  keyType = left.dataType.asInstanceOf[ArrayType].elementType,
+  valueType = right.dataType.asInstanceOf[ArrayType].elementType,
+  valueContainsNull = 
right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def nullSafeEval(keyArray: Any, valueArray: Any): Any = {
+val keyArrayData = keyArray.asInstanceOf[ArrayData]
+val valueArrayData = valueArray.asInstanceOf[ArrayData]
+if (keyArrayData.numElements != valueArrayData.numElements) {
+  throw new RuntimeException("The given two arrays should have the 
same length")
+}
+val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+if (leftArrayType.containsNull) {
+  if (keyArrayData.toArray(leftArrayType.elementType).contains(null)) {
+throw new RuntimeException("Cannot use null as map key!")
+  }
+}
+new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy())
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => {
+  val arrayBasedMapData = classOf[ArrayBasedMapData].getName
+  val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+  val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else 
{
+val leftArrayTypeTerm = ctx.addReferenceObj("leftArrayType", 
leftArrayType.elementType)
+val array = ctx.freshName("array")
+val i = ctx.freshName("i")
+s"""
+   |Object[] $array = 
$keyArrayData.toObjectArray($leftArrayTypeTerm);
+   |for (int $i = 0; $i < $array.length; $i++) {
+   |  if ($array[$i] == null) {
+   |throw new RuntimeException("Cannot use null as map key!");
+   |  }
+   |}
--- End diff --

This code should work if we evaluate each element to make `isNullAt()` 
valid.

I think that my mistake is not to currently evaluate each element in 
`keyArrayData` and `valueArrayData.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19691
  
**[Test build #91473 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91473/testReport)**
 for PR 19691 at commit 
[`6b18939`](https://github.com/apache/spark/commit/6b189398c138e2fb17085ece1bd36d30cbc0aa44).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21370: [SPARK-24215][PySpark] Implement _repr_html_ for datafra...

2018-06-04 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21370
  
Thanks @HyukjinKwon and all reviewers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21494: [WIP][SPARK-24375][Prototype] Support barrier scheduling

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21494
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21494: [WIP][SPARK-24375][Prototype] Support barrier scheduling

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21494
  
**[Test build #91471 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91471/testReport)**
 for PR 21494 at commit 
[`84cdc68`](https://github.com/apache/spark/commit/84cdc68c33e913335d8a0a26a388a32ba6e9cf18).
 * This patch **fails from timeout after a configured wait of \`300m\`**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21494: [WIP][SPARK-24375][Prototype] Support barrier scheduling

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21494
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91471/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20915: [SPARK-23803][SQL] Support bucket pruning

2018-06-04 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20915
  
@sabanas @pwoody Sorry for the late reply. We are migrating Parquet, ORC 
and the other data sources using Data Source API V2. Thus, we are trying to 
address all these issues in the new code. @cloud-fan and @gengliangwang are 
leading the migration efforts now. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21119: [SPARK-19826][ML][PYTHON]add spark.ml Python API for PIC

2018-06-04 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/21119
  
@huaxingao We updated the Scala/Java API in 
https://github.com/apache/spark/pull/21493. Could you update this PR for the 
Python API? It should be similar to the PrefixSpan Python API 
(https://github.com/apache/spark/commit/90ae98d1accb3e4b7d381de072257bdece8dd7e0),
 which is neither a transformer nor an estimator. Let me know if you don't have 
time. @WeichenXu123  could update the Python API as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21493: [SPARK-15784] Add Power Iteration Clustering to s...

2018-06-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21493


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml

2018-06-04 Thread mengxr
Github user mengxr commented on the issue:

https://github.com/apache/spark/pull/21493
  
LGTM. Merged into master. Thanks! @WeichenXu123 Could you also add the 
Python API?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...

2018-06-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21477
  
let me cc @HeartSaVioR too since he's looking through ss code bit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21447: [SPARK-24339][SQL]Add project for transform/map/reduce s...

2018-06-04 Thread xdcjie
Github user xdcjie commented on the issue:

https://github.com/apache/spark/pull/21447
  
@maropu @gatorsmile Can you help me review this pr ?  Thanks。 #


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-04 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
@arunmahadevan 
I didn't add the metric to StateOperatorProgress cause this behavior is 
specific to HDFSBackedStateStoreProvider (though this is only one 
implementation available in Apache Spark) so not sure this metric can be 
treated as a general one. (@tdas what do you think about this?)

Btw, the cache is going to clean up when maintenance operation is in 
progress, so there could be more than 100 versions in map. Not sure why it 
shows 150x, but I couldn't find missing spot on the patch. Maybe the issue is 
from SizeEstimator.estimate()?

One thing we need to check is how SizeEstimator.estimate() calculate the 
memory usage when Unsafe row objects are shared across versions. If 
SizeEstimator adds the size of object whenever it is referenced, it will report 
much higher memory usage than actual.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21482#discussion_r192931084
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala
 ---
@@ -199,6 +199,50 @@ case class Nvl2(expr1: Expression, expr2: Expression, 
expr3: Expression, child:
   override def sql: String = s"$prettyName(${expr1.sql}, ${expr2.sql}, 
${expr3.sql})"
 }
 
+/**
+ * Evaluates to `true` iff it's Infinity.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - Returns true if expr evaluates to infinite else 
returns false ",
+  examples = """
+Examples:
+  > SELECT _FUNC_(1/0);
+   True
+  > SELECT _FUNC_(5);
+   False
+  """)
+case class IsInf(child: Expression) extends UnaryExpression
+  with Predicate with ImplicitCastInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = 
Seq(TypeCollection(DoubleType, FloatType))
+
+  override def nullable: Boolean = false
+
+  override def eval(input: InternalRow): Boolean = {
+val value = child.eval(input)
+if (value == null) {
+  false
+} else {
+  child.dataType match {
+case DoubleType => value.asInstanceOf[Double].isInfinity
+case FloatType => value.asInstanceOf[Float].isInfinity
+  }
+}
+  }
+
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val eval = child.genCode(ctx)
+child.dataType match {
+  case DoubleType | FloatType =>
--- End diff --

I think we will only see double and float types here because of 
`inputTypes`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21490: [SPARK-24462][SS] Initialize the offsets correctly when ...

2018-06-04 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/21490
  
@arunmahadevan I have already proposed a fix for this issue 
https://github.com/apache/spark/pull/20958, but seems it is not so necessary to 
fix based on the comments, and pending to review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21462: [SPARK-24428][K8S] Fix unused code

2018-06-04 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21462#discussion_r192925875
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh ---
@@ -46,12 +46,6 @@ shift 1
 SPARK_CLASSPATH="$SPARK_CLASSPATH:${SPARK_HOME}/jars/*"
 env | grep SPARK_JAVA_OPT_ | sort -t_ -k4 -n | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt
 readarray -t SPARK_JAVA_OPTS < /tmp/java_opts.txt
--- End diff --

it would be confusing -- this is local though right, not exported?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21313: [SPARK-24187][R][SQL]Add array_join function to S...

2018-06-04 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21313#discussion_r192925340
  
--- Diff: R/pkg/R/functions.R ---
@@ -3006,6 +3008,27 @@ setMethod("array_contains",
 column(jc)
   })
 
+#' @details
+#' \code{array_join}: Concatenates the elements of column using the 
delimiter.
+#' Null values are replaced with nullReplacement if set, otherwise they 
are ignored.
+#'
+#' @param delimiter a character string that is used to concatenate the 
elements of column.
+#' @param nullReplacement a character string that is used to replace the 
Null values.
--- End diff --

could you change this to
`nullReplacement an optional character string `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18447: [SPARK-21232][SQL][SparkR][PYSPARK] New built-in SQL fun...

2018-06-04 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/18447
  
I guess we still have an open question on 
https://github.com/apache/spark/pull/18447/files#r130032578


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...

2018-06-04 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/21390
  
Thanks @JoshRosen for very detailed and thoughtful reply.
Agreed TTL could be fragile, but I was very concern with what point

> There is a related issue where shuffle files can be leaked indefinitely 
following executor death because the external shuffle service is never directly 
told that shuffles are safe to remove (the context cleaner sends RPCs to 
executors and executors clean up their own shuffle files). That issue is 
substantially harder to fix, though, since it likely requires protocol changes 
to the shuffle service or an inversion-of-control where the shuffle service can 
periodically ask the driver "do any of these shuffle IDs correspond to cleaned 
shuffles?".

So will probably follow up with you at some point.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21493
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21493
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91472/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21493
  
**[Test build #91472 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91472/testReport)**
 for PR 21493 at commit 
[`15c087f`](https://github.com/apache/spark/commit/15c087fdf3f3f84732e084f7a327813065e678a1).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21368: [SPARK-16451][repl] Fail shell if SparkSession fa...

2018-06-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21368


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21368: [SPARK-16451][repl] Fail shell if SparkSession fails to ...

2018-06-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21368
  
Merged to master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21370: [SPARK-24215][PySpark] Implement _repr_html_ for ...

2018-06-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21370


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21366
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91464/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21366
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21366
  
**[Test build #91464 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91464/testReport)**
 for PR 21366 at commit 
[`b5c0fbf`](https://github.com/apache/spark/commit/b5c0fbf19a1e393a6df18b362558cb39afa2085d).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21370: [SPARK-24215][PySpark] Implement _repr_html_ for datafra...

2018-06-04 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21370
  
Merged to master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21366
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91463/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21366
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21366
  
**[Test build #91463 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91463/testReport)**
 for PR 21366 at commit 
[`7bf49ba`](https://github.com/apache/spark/commit/7bf49ba9860c2ad7bab916aafd331a404c6987b5).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21482
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21482
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91467/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21482
  
**[Test build #91467 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91467/testReport)**
 for PR 21482 at commit 
[`432c61b`](https://github.com/apache/spark/commit/432c61b4a2297bc12065c73c9eb826e0a8e02b47).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r192920197
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (partition_id, epoch_id))
+... return True
+... 

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r192921387
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python._
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.sql.ForeachWriter
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{NextIterator, Utils}
+
+class PythonForeachWriter(func: PythonFunction, schema: StructType)
+  extends ForeachWriter[UnsafeRow] {
+
+  private lazy val context = TaskContext.get()
+  private lazy val buffer = new PythonForeachWriter.UnsafeRowBuffer(
+context.taskMemoryManager, new 
File(Utils.getLocalDir(SparkEnv.get.conf)), schema.fields.length)
+  private lazy val inputRowIterator = buffer.iterator
+
+  private lazy val inputByteIterator = {
+EvaluatePython.registerPicklers()
+val objIterator = inputRowIterator.map { row => 
EvaluatePython.toJava(row, schema) }
+new SerDeUtil.AutoBatchedPickler(objIterator)
+  }
+
+  private lazy val pythonRunner = {
+val conf = SparkEnv.get.conf
+val bufferSize = conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
+PythonRunner(func, bufferSize, reuseWorker)
+  }
+
+  private lazy val outputIterator =
+pythonRunner.compute(inputByteIterator, context.partitionId(), context)
+
+  override def open(partitionId: Long, version: Long): Boolean = {
+outputIterator  // initialize everything
+TaskContext.get.addTaskCompletionListener { _ => buffer.close() }
+true
+  }
+
+  override def process(value: UnsafeRow): Unit = {
+buffer.add(value)
+  }
+
+  override def close(errorOrNull: Throwable): Unit = {
+buffer.allRowsAdded()
+if (outputIterator.hasNext) outputIterator.next() // to throw python 
exception if there was one
+  }
+}
+
+object PythonForeachWriter {
+
+  /**
+   * A buffer that is designed for the sole purpose of buffering 
UnsafeRows in PythonForeahWriter.
+   * It is designed to be used with only 1 writer thread (i.e. JVM task 
thread) and only 1 reader
+   * thread (i.e. PythonRunner writing thread that reads from the buffer 
and writes to the Python
+   * worker stdin). Adds to the buffer are non-blocking, and reads through 
the buffer's iterator
+   * are blocking, that is, it blocks until new data is available or all 
data has been added.
+   *
+   * Internally, it uses a [[HybridRowQueue]] to buffer the rows in a 
practically unlimited queue
+   * across memory and local disk. However, HybridRowQueue is designed to 
be used only with
+   * EvalPythonExec where the reader is always behind the the writer, that 
is, the reader does not
+   * try to read n+1 rows if the writer has only written n rows at any 
point of time. This
+   * assumption is not true for PythonForeachWriter where rows may be 
added at a different rate as
+   * they are consumed by the python worker. Hence, to maintain the 
invariant of the reader being
+   * behind the writer while using HybridRowQueue, the buffer does the 
following
+   * - Keeps a count of the rows in the HybridRowQueue
+   * - Blocks the buffer's consuming iterator when the count is 0 so that 
the reader does not
+   *   try to read more rows than what has been written.
+   *
+   * The implementation of the blocking iterator (ReentrantLock, 
Condition, etc.) has been 

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r192920809
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (partition_id, epoch_id))
+... return True
+... 

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r192921031
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self):
 finally:
 q.stop()
 shutil.rmtree(tmpPath)
+'''
 
+class ForeachWriterTester:
+
+def __init__(self, spark):
+self.spark = spark
+self.input_dir = tempfile.mkdtemp()
+self.open_events_dir = tempfile.mkdtemp()
+self.process_events_dir = tempfile.mkdtemp()
+self.close_events_dir = tempfile.mkdtemp()
+
+def write_open_event(self, partitionId, epochId):
+self._write_event(
+self.open_events_dir,
+{'partition': partitionId, 'epoch': epochId})
+
+def write_process_event(self, row):
+self._write_event(self.process_events_dir, {'value': 'text'})
+
+def write_close_event(self, error):
+self._write_event(self.close_events_dir, {'error': str(error)})
+
+def write_input_file(self):
+self._write_event(self.input_dir, "text")
+
+def open_events(self):
+return self._read_events(self.open_events_dir, 'partition INT, 
epoch INT')
+
+def process_events(self):
+return self._read_events(self.process_events_dir, 'value 
STRING')
+
+def close_events(self):
+return self._read_events(self.close_events_dir, 'error STRING')
+
+def run_streaming_query_on_writer(self, writer, num_files):
+try:
+sdf = 
self.spark.readStream.format('text').load(self.input_dir)
+sq = sdf.writeStream.foreach(writer).start()
+for i in range(num_files):
+self.write_input_file()
+sq.processAllAvailable()
+sq.stop()
+finally:
+self.stop_all()
+
+def _read_events(self, dir, json):
+rows = self.spark.read.schema(json).json(dir).collect()
+dicts = [row.asDict() for row in rows]
+return dicts
+
+def _write_event(self, dir, event):
+import random
+file = open(os.path.join(dir, str(random.randint(0, 10))), 
'w')
+file.write("%s\n" % str(event))
+file.close()
--- End diff --

should it be put in finally too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r192920358
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (partition_id, epoch_id))
+... return True
+... 

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r192921253
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self):
 finally:
 q.stop()
 shutil.rmtree(tmpPath)
+'''
 
+class ForeachWriterTester:
+
+def __init__(self, spark):
+self.spark = spark
+self.input_dir = tempfile.mkdtemp()
+self.open_events_dir = tempfile.mkdtemp()
+self.process_events_dir = tempfile.mkdtemp()
+self.close_events_dir = tempfile.mkdtemp()
+
+def write_open_event(self, partitionId, epochId):
+self._write_event(
+self.open_events_dir,
+{'partition': partitionId, 'epoch': epochId})
+
+def write_process_event(self, row):
+self._write_event(self.process_events_dir, {'value': 'text'})
+
+def write_close_event(self, error):
+self._write_event(self.close_events_dir, {'error': str(error)})
+
+def write_input_file(self):
+self._write_event(self.input_dir, "text")
+
+def open_events(self):
+return self._read_events(self.open_events_dir, 'partition INT, 
epoch INT')
+
+def process_events(self):
+return self._read_events(self.process_events_dir, 'value 
STRING')
+
+def close_events(self):
+return self._read_events(self.close_events_dir, 'error STRING')
+
+def run_streaming_query_on_writer(self, writer, num_files):
+try:
+sdf = 
self.spark.readStream.format('text').load(self.input_dir)
+sq = sdf.writeStream.foreach(writer).start()
+for i in range(num_files):
+self.write_input_file()
+sq.processAllAvailable()
+sq.stop()
+finally:
+self.stop_all()
+
+def _read_events(self, dir, json):
+rows = self.spark.read.schema(json).json(dir).collect()
+dicts = [row.asDict() for row in rows]
+return dicts
+
+def _write_event(self, dir, event):
+import random
+file = open(os.path.join(dir, str(random.randint(0, 10))), 
'w')
+file.write("%s\n" % str(event))
+file.close()
+
+def stop_all(self):
+for q in self.spark._wrapped.streams.active:
+q.stop()
+
+def __getstate__(self):
+return (self.open_events_dir, self.process_events_dir, 
self.close_events_dir)
+
+def __setstate__(self, state):
+self.open_events_dir, self.process_events_dir, 
self.close_events_dir = state
+
+def test_streaming_foreach_with_simple_function(self):
+tester = self.ForeachWriterTester(self.spark)
+
+def foreach_func(row):
+tester.write_process_event(row)
+
+tester.run_streaming_query_on_writer(foreach_func, 2)
+self.assertEqual(len(tester.process_events()), 2)
+
+def test_streaming_foreach_with_basic_open_process_close(self):
+tester = self.ForeachWriterTester(self.spark)
+
+class ForeachWriter:
+def open(self, partitionId, epochId):
+tester.write_open_event(partitionId, epochId)
+return True
+
+def process(self, row):
+tester.write_process_event(row)
+
+def close(self, error):
+tester.write_close_event(error)
+
+tester.run_streaming_query_on_writer(ForeachWriter(), 2)
+
+open_events = tester.open_events()
+self.assertEqual(len(open_events), 2)
+self.assertSetEqual(set([e['epoch'] for e in open_events]), {0, 1})
+
+self.assertEqual(len(tester.process_events()), 2)
+
+close_events = tester.close_events()
+self.assertEqual(len(close_events), 2)
+self.assertSetEqual(set([e['error'] for e in close_events]), 
{'None'})
+
+def test_streaming_foreach_with_open_returning_false(self):
+tester = self.ForeachWriterTester(self.spark)
+
+class ForeachWriter:
+def open(self, partitionId, epochId):
+tester.write_open_event(partitionId, epochId)
+return False
+
+def process(self, row):
+tester.write_process_event(row)
+
+def close(self, error):
+

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r192921274
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self):
 finally:
 q.stop()
 shutil.rmtree(tmpPath)
+'''
 
+class ForeachWriterTester:
+
+def __init__(self, spark):
+self.spark = spark
+self.input_dir = tempfile.mkdtemp()
+self.open_events_dir = tempfile.mkdtemp()
+self.process_events_dir = tempfile.mkdtemp()
+self.close_events_dir = tempfile.mkdtemp()
+
+def write_open_event(self, partitionId, epochId):
+self._write_event(
+self.open_events_dir,
+{'partition': partitionId, 'epoch': epochId})
+
+def write_process_event(self, row):
+self._write_event(self.process_events_dir, {'value': 'text'})
+
+def write_close_event(self, error):
+self._write_event(self.close_events_dir, {'error': str(error)})
+
+def write_input_file(self):
+self._write_event(self.input_dir, "text")
+
+def open_events(self):
+return self._read_events(self.open_events_dir, 'partition INT, 
epoch INT')
+
+def process_events(self):
+return self._read_events(self.process_events_dir, 'value 
STRING')
+
+def close_events(self):
+return self._read_events(self.close_events_dir, 'error STRING')
+
+def run_streaming_query_on_writer(self, writer, num_files):
+try:
+sdf = 
self.spark.readStream.format('text').load(self.input_dir)
+sq = sdf.writeStream.foreach(writer).start()
+for i in range(num_files):
+self.write_input_file()
+sq.processAllAvailable()
+sq.stop()
+finally:
+self.stop_all()
+
+def _read_events(self, dir, json):
+rows = self.spark.read.schema(json).json(dir).collect()
+dicts = [row.asDict() for row in rows]
+return dicts
+
+def _write_event(self, dir, event):
+import random
+file = open(os.path.join(dir, str(random.randint(0, 10))), 
'w')
+file.write("%s\n" % str(event))
+file.close()
+
+def stop_all(self):
+for q in self.spark._wrapped.streams.active:
+q.stop()
+
+def __getstate__(self):
+return (self.open_events_dir, self.process_events_dir, 
self.close_events_dir)
+
+def __setstate__(self, state):
+self.open_events_dir, self.process_events_dir, 
self.close_events_dir = state
+
+def test_streaming_foreach_with_simple_function(self):
+tester = self.ForeachWriterTester(self.spark)
+
+def foreach_func(row):
+tester.write_process_event(row)
+
+tester.run_streaming_query_on_writer(foreach_func, 2)
+self.assertEqual(len(tester.process_events()), 2)
+
+def test_streaming_foreach_with_basic_open_process_close(self):
+tester = self.ForeachWriterTester(self.spark)
+
+class ForeachWriter:
+def open(self, partitionId, epochId):
+tester.write_open_event(partitionId, epochId)
+return True
+
+def process(self, row):
+tester.write_process_event(row)
+
+def close(self, error):
+tester.write_close_event(error)
+
+tester.run_streaming_query_on_writer(ForeachWriter(), 2)
+
+open_events = tester.open_events()
+self.assertEqual(len(open_events), 2)
+self.assertSetEqual(set([e['epoch'] for e in open_events]), {0, 1})
+
+self.assertEqual(len(tester.process_events()), 2)
+
+close_events = tester.close_events()
+self.assertEqual(len(close_events), 2)
+self.assertSetEqual(set([e['error'] for e in close_events]), 
{'None'})
+
+def test_streaming_foreach_with_open_returning_false(self):
+tester = self.ForeachWriterTester(self.spark)
+
+class ForeachWriter:
+def open(self, partitionId, epochId):
+tester.write_open_event(partitionId, epochId)
+return False
+
+def process(self, row):
+tester.write_process_event(row)
+
+def close(self, error):
+

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r192921061
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self):
 finally:
 q.stop()
 shutil.rmtree(tmpPath)
+'''
 
+class ForeachWriterTester:
+
+def __init__(self, spark):
+self.spark = spark
+self.input_dir = tempfile.mkdtemp()
+self.open_events_dir = tempfile.mkdtemp()
+self.process_events_dir = tempfile.mkdtemp()
+self.close_events_dir = tempfile.mkdtemp()
+
+def write_open_event(self, partitionId, epochId):
+self._write_event(
+self.open_events_dir,
+{'partition': partitionId, 'epoch': epochId})
+
+def write_process_event(self, row):
+self._write_event(self.process_events_dir, {'value': 'text'})
+
+def write_close_event(self, error):
+self._write_event(self.close_events_dir, {'error': str(error)})
+
+def write_input_file(self):
+self._write_event(self.input_dir, "text")
+
+def open_events(self):
+return self._read_events(self.open_events_dir, 'partition INT, 
epoch INT')
+
+def process_events(self):
+return self._read_events(self.process_events_dir, 'value 
STRING')
+
+def close_events(self):
+return self._read_events(self.close_events_dir, 'error STRING')
+
+def run_streaming_query_on_writer(self, writer, num_files):
+try:
+sdf = 
self.spark.readStream.format('text').load(self.input_dir)
+sq = sdf.writeStream.foreach(writer).start()
+for i in range(num_files):
+self.write_input_file()
+sq.processAllAvailable()
+sq.stop()
+finally:
+self.stop_all()
+
+def _read_events(self, dir, json):
+rows = self.spark.read.schema(json).json(dir).collect()
+dicts = [row.asDict() for row in rows]
+return dicts
+
+def _write_event(self, dir, event):
+import random
+file = open(os.path.join(dir, str(random.randint(0, 10))), 
'w')
--- End diff --

nit: `file` shadows the builtin function `file` .. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r192920553
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (partition_id, epoch_id))
+... return True
+... 

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r192920157
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (partition_id, epoch_id))
+... return True
+... 

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r192920015
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -30,6 +30,7 @@
 from pyspark.sql.readwriter import OptionUtils, to_str
 from pyspark.sql.types import *
 from pyspark.sql.utils import StreamingQueryException
+from abc import ABCMeta, abstractmethod
--- End diff --

@tdas, Seems not used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r192921149
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
--- End diff --

Hmm, I think `MapFromArrays` is fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r192920012
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(_, _), ArrayType(_, _)) =>
+TypeCheckResult.TypeCheckSuccess
+  case _ =>
+TypeCheckResult.TypeCheckFailure("The given two arguments should 
be an array")
+}
+  }
+
+  override def dataType: DataType = {
+MapType(
+  keyType = left.dataType.asInstanceOf[ArrayType].elementType,
+  valueType = right.dataType.asInstanceOf[ArrayType].elementType,
+  valueContainsNull = 
right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def nullSafeEval(keyArray: Any, valueArray: Any): Any = {
+val keyArrayData = keyArray.asInstanceOf[ArrayData]
+val valueArrayData = valueArray.asInstanceOf[ArrayData]
+if (keyArrayData.numElements != valueArrayData.numElements) {
+  throw new RuntimeException("The given two arrays should have the 
same length")
+}
+val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+if (leftArrayType.containsNull) {
+  if (keyArrayData.toArray(leftArrayType.elementType).contains(null)) {
+throw new RuntimeException("Cannot use null as map key!")
+  }
+}
+new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy())
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => {
+  val arrayBasedMapData = classOf[ArrayBasedMapData].getName
+  val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+  val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else 
{
+val leftArrayTypeTerm = ctx.addReferenceObj("leftArrayType", 
leftArrayType.elementType)
+val array = ctx.freshName("array")
+val i = ctx.freshName("i")
+s"""
+   |Object[] $array = 
$keyArrayData.toObjectArray($leftArrayTypeTerm);
+   |for (int $i = 0; $i < $array.length; $i++) {
+   |  if ($array[$i] == null) {
+   |throw new RuntimeException("Cannot use null as map key!");
+   |  }
+   |}
--- End diff --

I'm sorry, but I couldn't get it. I might miss something, but I thought we 
can simply do like:

```
for (int $i = 0; $i < $keyArrayData.numElements(); $i++) {
  if ($keyArrayData.isNullAt($i)) {
throw new RuntimeException("Cannot use null as map key!");
  }
}
```

Doesn't this work?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r192920586
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
--- End diff --

Do we need this? I guess `ExpectsInputTypes` will throw an exception in 
case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r192920690
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(_, _), ArrayType(_, _)) =>
+TypeCheckResult.TypeCheckSuccess
+  case _ =>
+TypeCheckResult.TypeCheckFailure("The given two arguments should 
be an array")
+}
+  }
+
+  override def dataType: DataType = {
+MapType(
+  keyType = left.dataType.asInstanceOf[ArrayType].elementType,
+  valueType = right.dataType.asInstanceOf[ArrayType].elementType,
+  valueContainsNull = 
right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  override def nullable: Boolean = left.nullable || right.nullable
--- End diff --

This is the same as `BinaryExpression` is doing?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function

2018-06-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21061#discussion_r192919335
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -2189,3 +2189,302 @@ case class ArrayRemove(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_remove"
 }
+
+object ArraySetLike {
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = {
+val array = new Array[Int](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+if (useGenericArrayData(LongType.defaultSize, array.length)) {
+  new GenericArrayData(array)
+} else {
+  UnsafeArrayData.fromPrimitiveArray(array)
+}
+  }
+
+  def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = {
+val array = new Array[Long](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+if (useGenericArrayData(LongType.defaultSize, array.length)) {
+  new GenericArrayData(array)
+} else {
+  UnsafeArrayData.fromPrimitiveArray(array)
+}
+  }
+
+  def useGenericArrayData(elementSize: Int, length: Int): Boolean = {
--- End diff --

Ah, I see.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-06-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r192919160
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -308,6 +308,170 @@ case class MapEntries(child: Expression) extends 
UnaryExpression with ExpectsInp
   override def prettyName: String = "map_entries"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));
+   [[1 -> "a"], [2 -> "c"], [3 -> "d"]
+  """, since = "2.4.0")
+case class MapConcat(children: Seq[Expression]) extends Expression {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+// check key types and value types separately to allow 
valueContainsNull to vary
+if (children.exists(!_.dataType.isInstanceOf[MapType])) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input of function $prettyName should all be of type 
map, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else if (children.map(_.dataType.asInstanceOf[MapType].keyType)
+  .exists(_.isInstanceOf[MapType])) {
+  // map_concat needs to pick a winner when multiple maps contain the 
same key. map_concat
+  // can do that only if it can detect when two keys are the same. 
SPARK-9415 states "map type
+  // should not support equality, hash". As a result, map_concat does 
not support a map type
+  // as a key
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName cannot have a map 
type as a key")
+} else if 
(children.map(_.dataType.asInstanceOf[MapType].keyType).distinct.length > 1) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName should all be the 
same type, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else if 
(children.map(_.dataType.asInstanceOf[MapType].valueType).distinct.length > 1) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName should all be the 
same type, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override def dataType: MapType = {
+MapType(
--- End diff --

How about doing `children.map(_.dataType.asInstanceOf[MapType])` first?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-06-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r192918429
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -116,6 +117,161 @@ case class MapValues(child: Expression)
   override def prettyName: String = "map_values"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));
+   [[1 -> "a"], [2 -> "c"], [3 -> "d"]
+  """, since = "2.4.0")
+case class MapConcat(children: Seq[Expression]) extends Expression {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+// check key types and value types separately to allow 
valueContainsNull to vary
+if (children.exists(!_.dataType.isInstanceOf[MapType])) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input of function $prettyName should all be of type 
map, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else if 
(children.map(_.dataType.asInstanceOf[MapType].keyType).distinct.length > 1) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName should all be the 
same type, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else if 
(children.map(_.dataType.asInstanceOf[MapType].valueType).distinct.length > 1) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName should all be the 
same type, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override def dataType: MapType = {
+MapType(
+  keyType = children.headOption
+
.map(_.dataType.asInstanceOf[MapType].keyType).getOrElse(StringType),
+  valueType = children.headOption
+
.map(_.dataType.asInstanceOf[MapType].valueType).getOrElse(StringType),
+  valueContainsNull = children.map { c =>
+c.dataType.asInstanceOf[MapType]
+  }.exists(_.valueContainsNull)
+)
+  }
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  override def eval(input: InternalRow): Any = {
+val union = new util.LinkedHashMap[Any, Any]()
+children.map(_.eval(input)).foreach { raw =>
+  if (raw == null) {
+return null
+  }
+  val map = raw.asInstanceOf[MapData]
+  map.foreach(dataType.keyType, dataType.valueType, (k, v) =>
+union.put(k, v)
+  )
--- End diff --

@bersprockets Hi, thanks for the investigation. We don't need to care about 
key duplication like `CreateMap` for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-06-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r192918533
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -308,6 +308,170 @@ case class MapEntries(child: Expression) extends 
UnaryExpression with ExpectsInp
   override def prettyName: String = "map_entries"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
--- End diff --

nit: indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-06-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r192918569
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -308,6 +308,170 @@ case class MapEntries(child: Expression) extends 
UnaryExpression with ExpectsInp
   override def prettyName: String = "map_entries"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));
+   [[1 -> "a"], [2 -> "c"], [3 -> "d"]
--- End diff --

The brackets doesn't match?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21493
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3808/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21493
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21493
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21493
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91470/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21493
  
**[Test build #91470 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91470/testReport)**
 for PR 21493 at commit 
[`a605a2d`](https://github.com/apache/spark/commit/a605a2dba4243e5f1526bc239fd6dbe88dd13ce9).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21493
  
**[Test build #91472 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91472/testReport)**
 for PR 21493 at commit 
[`15c087f`](https://github.com/apache/spark/commit/15c087fdf3f3f84732e084f7a327813065e678a1).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21366
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91466/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21366
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21366
  
**[Test build #91466 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91466/testReport)**
 for PR 21366 at commit 
[`c4b87d8`](https://github.com/apache/spark/commit/c4b87d81a520cec74cd1a39a79a3b31a708b95c9).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21045
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91465/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21045
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21045
  
**[Test build #91465 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91465/testReport)**
 for PR 21045 at commit 
[`0223960`](https://github.com/apache/spark/commit/02239609c2a6a22a306e631577862dc1a8736868).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21494: [WIP][SPARK-24375][Prototype] Support barrier scheduling

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21494
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3807/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21494: [WIP][SPARK-24375][Prototype] Support barrier scheduling

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21494
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21494: [WIP][SPARK-24375][Prototype] Support barrier scheduling

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21494
  
**[Test build #91471 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91471/testReport)**
 for PR 21494 at commit 
[`84cdc68`](https://github.com/apache/spark/commit/84cdc68c33e913335d8a0a26a388a32ba6e9cf18).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-04 Thread jiangxb1987
GitHub user jiangxb1987 opened a pull request:

https://github.com/apache/spark/pull/21494

[WIP][SPARK-24375][Prototype] Support barrier scheduling

## What changes were proposed in this pull request?

Add new RDDBarrier and BarrierTaskContext to support barrier scheduling in 
Spark, also modify how the job scheduling works a bit to accommodate the new 
feature.

This is a prototype to facilitate discussion, it's not meant for the final 
design or anything, just shows one way that might works.

## How was this patch tested?

Simple unit test and integration test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jiangxb1987/spark barrierSync

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21494.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21494


commit 6bdc5fe27dd2c28971e61119ef63dbee0385138d
Author: Xingbo Jiang 
Date:   2018-05-17T03:20:17Z

Implement support for barrier scheduling

commit 84cdc68c33e913335d8a0a26a388a32ba6e9cf18
Author: Xingbo Jiang 
Date:   2018-06-04T23:45:06Z

add TODOs




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21492: [SPARK-24300][ML] change the way to set seed in ml.clust...

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21492
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91469/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21492: [SPARK-24300][ML] change the way to set seed in ml.clust...

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21492
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21492: [SPARK-24300][ML] change the way to set seed in ml.clust...

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21492
  
**[Test build #91469 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91469/testReport)**
 for PR 21492 at commit 
[`fa2c422`](https://github.com/apache/spark/commit/fa2c42261a20a9c2a980889c175dd320c4c5d836).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21045
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91460/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21045
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21491: [SPARK-24453][SS] Fix error recovering from the failure ...

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21491
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91457/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21045
  
**[Test build #91460 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91460/testReport)**
 for PR 21045 at commit 
[`643cb9b`](https://github.com/apache/spark/commit/643cb9b80be8060be16514c4cfc31dba253d8bf2).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21491: [SPARK-24453][SS] Fix error recovering from the failure ...

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21491
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21493: [SPARK-15784] Add Power Iteration Clustering to s...

2018-06-04 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21493#discussion_r192910578
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala
 ---
@@ -222,17 +167,13 @@ object PowerIterationClusteringSuite {
 val n = n1 + n2
 val points = genCircle(r1, n1) ++ genCircle(r2, n2)
 
-val rows = for (i <- 1 until n) yield {
-  val neighbors = for (j <- 0 until i) yield {
-j.toLong
+val rows = (for (i <- 1 until n) yield {
+  for (j <- 0 until i) yield {
+(i.toLong, j.toLong, sim(points(i), points(j)))
   }
-  val similarities = for (j <- 0 until i) yield {
-sim(points(i), points(j))
-  }
-  (i.toLong, neighbors.toArray, similarities.toArray)
-}
+}).flatMap(_.iterator)
 
-spark.createDataFrame(rows).toDF("id", "neighbors", "similarities")
+spark.createDataFrame(rows).toDF("src", "dst", "weight")
   }
 
 }
--- End diff --

Should test default weight.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21493: [SPARK-15784] Add Power Iteration Clustering to s...

2018-06-04 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21493#discussion_r192909369
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 ---
@@ -66,62 +65,35 @@ private[clustering] trait 
PowerIterationClusteringParams extends Params with Has
   def getInitMode: String = $(initMode)
 
   /**
-   * Param for the name of the input column for vertex IDs.
-   * Default: "id"
+   * Param for the name of the input column for source vertex IDs.
+   * Default: "src"
* @group param
*/
   @Since("2.4.0")
-  val idCol = new Param[String](this, "idCol", "Name of the input column 
for vertex IDs.",
+  val srcCol = new Param[String](this, "srcCol", "Name of the input column 
for source vertex IDs.",
 (value: String) => value.nonEmpty)
 
-  setDefault(idCol, "id")
+  setDefault(srcCol, "src")
 
   /** @group getParam */
   @Since("2.4.0")
-  def getIdCol: String = getOrDefault(idCol)
+  def getSrcCol: String = getOrDefault(srcCol)
 
   /**
-   * Param for the name of the input column for neighbors in the adjacency 
list representation.
-   * Default: "neighbors"
+   * Name of the input column for destination vertex IDs.
+   * Default: "dst"
* @group param
*/
   @Since("2.4.0")
-  val neighborsCol = new Param[String](this, "neighborsCol",
-"Name of the input column for neighbors in the adjacency list 
representation.",
+  val dstCol = new Param[String](this, "dstCol",
+"Name of the input column for destination vertex IDs.",
 (value: String) => value.nonEmpty)
 
-  setDefault(neighborsCol, "neighbors")
+  setDefault(dstCol, "dst")
--- End diff --

Could you put all default values in a single `setDefault` call?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21493: [SPARK-15784] Add Power Iteration Clustering to s...

2018-06-04 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21493#discussion_r192910421
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala
 ---
@@ -62,136 +61,82 @@ class PowerIterationClusteringSuite extends 
SparkFunSuite
   new PowerIterationClustering().setInitMode("no_such_a_mode")
 }
 intercept[IllegalArgumentException] {
-  new PowerIterationClustering().setIdCol("")
+  new PowerIterationClustering().setSrcCol("")
 }
 intercept[IllegalArgumentException] {
-  new PowerIterationClustering().setNeighborsCol("")
-}
-intercept[IllegalArgumentException] {
-  new PowerIterationClustering().setSimilaritiesCol("")
+  new PowerIterationClustering().setDstCol("")
 }
   }
 
   test("power iteration clustering") {
 val n = n1 + n2
 
-val model = new PowerIterationClustering()
+val result = new PowerIterationClustering()
   .setK(2)
   .setMaxIter(40)
-val result = model.transform(data)
-
-val predictions = Array.fill(2)(mutable.Set.empty[Long])
-result.select("id", "prediction").collect().foreach {
-  case Row(id: Long, cluster: Integer) => predictions(cluster) += id
-}
-assert(predictions.toSet == Set((1 until n1).toSet, (n1 until 
n).toSet))
+  .setWeightCol("weight")
+  .assignClusters(data).as[(Long, Int)].collect().toSet
--- End diff --

it is better to split a long chain of methods.

~~~scala
val assignments = new ...
  ...
  .assignClusters(...)
val localAssignments = assignments
  .select('id, 'cluster) # need this we didn't put contract on column orders
  .as[(Long, Int)]
  .collect()
  .toSet
~~~


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21493: [SPARK-15784] Add Power Iteration Clustering to s...

2018-06-04 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21493#discussion_r192909491
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 ---
@@ -182,66 +137,59 @@ class PowerIterationClustering private[clustering] (
 
   /** @group setParam */
   @Since("2.4.0")
-  def setIdCol(value: String): this.type = set(idCol, value)
+  def setSrcCol(value: String): this.type = set(srcCol, value)
 
   /** @group setParam */
   @Since("2.4.0")
-  def setNeighborsCol(value: String): this.type = set(neighborsCol, value)
+  def setDstCol(value: String): this.type = set(dstCol, value)
 
   /** @group setParam */
   @Since("2.4.0")
-  def setSimilaritiesCol(value: String): this.type = set(similaritiesCol, 
value)
+  def setWeightCol(value: String): this.type = set(weightCol, value)
 
+  /**
+   * @param dataset A dataset with columns src, dst, weight representing 
the affinity matrix,
--- End diff --

Should explain what this method does first.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21493: [SPARK-15784] Add Power Iteration Clustering to s...

2018-06-04 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21493#discussion_r192909750
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 ---
@@ -182,66 +137,59 @@ class PowerIterationClustering private[clustering] (
 
   /** @group setParam */
   @Since("2.4.0")
-  def setIdCol(value: String): this.type = set(idCol, value)
+  def setSrcCol(value: String): this.type = set(srcCol, value)
 
   /** @group setParam */
   @Since("2.4.0")
-  def setNeighborsCol(value: String): this.type = set(neighborsCol, value)
+  def setDstCol(value: String): this.type = set(dstCol, value)
 
   /** @group setParam */
   @Since("2.4.0")
-  def setSimilaritiesCol(value: String): this.type = set(similaritiesCol, 
value)
+  def setWeightCol(value: String): this.type = set(weightCol, value)
 
+  /**
+   * @param dataset A dataset with columns src, dst, weight representing 
the affinity matrix,
+   *which is the matrix A in the PIC paper. Suppose the 
src column value is i,
+   *the dst column value is j, the weight column value is 
similarity s,,ij,,
+   *must be nonnegative. This is a symmetric matrix and 
hence s,,ij,, = s,,ji,,.
+   *For any (i, j) with nonzero similarity, there should 
be either
+   *(i, j, s,,ij,,) or (j, i, s,,ji,,) in the input. Rows 
with i = j are ignored,
+   *because we assume s,,ij,, = 0.0.
+   * @return A dataset that contains columns of vertex id and the 
corresponding cluster for the id.
+   * The schema of it will be:
+   *  - id: Long
+   *  - cluster: Int
+   */
   @Since("2.4.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-transformSchema(dataset.schema, logging = true)
+  def assignClusters(dataset: Dataset[_]): DataFrame = {
+val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) {
+  lit(1.0)
+} else {
+  col($(weightCol)).cast(DoubleType)
+}
 
-val sparkSession = dataset.sparkSession
-val idColValue = $(idCol)
-val rdd: RDD[(Long, Long, Double)] =
-  dataset.select(
-col($(idCol)).cast(LongType),
-col($(neighborsCol)).cast(ArrayType(LongType, containsNull = 
false)),
-col($(similaritiesCol)).cast(ArrayType(DoubleType, containsNull = 
false))
-  ).rdd.flatMap {
-case Row(id: Long, nbrs: Seq[_], sims: Seq[_]) =>
-  require(nbrs.size == sims.size, s"The length of the neighbor ID 
list must be " +
-s"equal to the the length of the neighbor similarity list.  
Row for ID " +
-s"$idColValue=$id has neighbor ID list of length 
${nbrs.length} but similarity list " +
-s"of length ${sims.length}.")
-  
nbrs.asInstanceOf[Seq[Long]].zip(sims.asInstanceOf[Seq[Double]]).map {
-case (nbr, similarity) => (id, nbr, similarity)
-  }
-  }
+SchemaUtils.checkColumnTypes(dataset.schema, $(srcCol), 
Seq(IntegerType, LongType))
+SchemaUtils.checkColumnTypes(dataset.schema, $(dstCol), 
Seq(IntegerType, LongType))
+val rdd: RDD[(Long, Long, Double)] = dataset.select(
+  col($(srcCol)).cast(LongType),
+  col($(dstCol)).cast(LongType),
+  w).rdd.map {
+  case Row(src: Long, dst: Long, weight: Double) => (src, dst, weight)
+}
 val algorithm = new MLlibPowerIterationClustering()
   .setK($(k))
   .setInitializationMode($(initMode))
   .setMaxIterations($(maxIter))
 val model = algorithm.run(rdd)
 
-val predictionsRDD: RDD[Row] = model.assignments.map { assignment =>
+val assignmentsRDD: RDD[Row] = model.assignments.map { assignment =>
--- End diff --

`model.assignments.toDF` should work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21491: [SPARK-24453][SS] Fix error recovering from the failure ...

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21491
  
**[Test build #91457 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91457/testReport)**
 for PR 21491 at commit 
[`14a2fca`](https://github.com/apache/spark/commit/14a2fca01ff79d8ea338718d075a98ef2739).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21493
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3806/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21045
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91461/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21045
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21493: [SPARK-15784] Add Power Iteration Clustering to spark.ml

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21493
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21045
  
**[Test build #91461 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91461/testReport)**
 for PR 21045 at commit 
[`5876082`](https://github.com/apache/spark/commit/5876082bb1828e51ca7a157d7213ff9a548d34b4).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21491: [SPARK-24453][SS] Fix error recovering from the failure ...

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21491
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91456/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21491: [SPARK-24453][SS] Fix error recovering from the failure ...

2018-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21491
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21491: [SPARK-24453][SS] Fix error recovering from the failure ...

2018-06-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21491
  
**[Test build #91456 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91456/testReport)**
 for PR 21491 at commit 
[`e6b12e6`](https://github.com/apache/spark/commit/e6b12e64d7cfffad7e50c7c46b9604ea39a781cb).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   >