[GitHub] spark issue #21209: [SPARK-24141][CORE] Fix bug in CoarseGrainedSchedulerBac...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21209
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21209: [SPARK-24141][CORE] Fix bug in CoarseGrainedSched...

2018-05-01 Thread Ngone51
GitHub user Ngone51 opened a pull request:

https://github.com/apache/spark/pull/21209

[SPARK-24141][CORE] Fix bug in CoarseGrainedSchedulerBackend.killExecutors

## What changes were proposed in this pull request?

In method *CoarseGrainedSchedulerBackend.killExecutors()*, 
`numPendingExecutors` should add 
`executorsToKill.size` rather than `knownExecutors.size` if we do not 
adjust target number of executors.

## How was this patch tested?

N/A


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ngone51/spark SPARK-24141

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21209.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21209


commit 264f316c178ff32ea632cc3db7e20ab68d555b85
Author: wuyi 
Date:   2018-05-02T01:50:01Z

fix a bug in killExecutors




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21021: [SPARK-23921][SQL] Add array_sort function

2018-05-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21021#discussion_r185378543
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -191,28 +161,202 @@ case class SortArray(base: Expression, 
ascendingOrder: Expression)
 if (o1 == null && o2 == null) {
   0
 } else if (o1 == null) {
-  1
+  -nullOrder
 } else if (o2 == null) {
-  -1
+  nullOrder
 } else {
   -ordering.compare(o1, o2)
 }
   }
 }
   }
 
-  override def nullSafeEval(array: Any, ascending: Any): Any = {
-val elementType = base.dataType.asInstanceOf[ArrayType].elementType
+  def elementType: DataType = 
arrayExpression.dataType.asInstanceOf[ArrayType].elementType
+
+  def sortEval(array: Any, ascending: Boolean): Any = {
 val data = array.asInstanceOf[ArrayData].toArray[AnyRef](elementType)
 if (elementType != NullType) {
-  java.util.Arrays.sort(data, if (ascending.asInstanceOf[Boolean]) lt 
else gt)
+  java.util.Arrays.sort(data, if (ascending) lt else gt)
 }
 new GenericArrayData(data.asInstanceOf[Array[Any]])
   }
 
+  def sortCodegen(ctx: CodegenContext, ev: ExprCode, base: String, order: 
String): String = {
+val arrayData = classOf[ArrayData].getName
+val genericArrayData = classOf[GenericArrayData].getName
+val array = ctx.freshName("array")
+val c = ctx.freshName("c")
+val dataTypes = elementType match {
+  case DecimalType.Fixed(p, s) =>
+s"org.apache.spark.sql.types.DataTypes.createDecimalType($p, $s)"
+  case ArrayType(et, cn) =>
+val dt = s"org.apache.spark.sql.types.$et$$.MODULE$$"
+s"org.apache.spark.sql.types.DataTypes.createArrayType($dt, $cn)"
+  case StructType(f) =>
+"org.apache.spark.sql.types.StructType$.MODULE$." +
+  s"apply(new java.util.ArrayList(${f.length}))"
+  case _ =>
+s"org.apache.spark.sql.types.$elementType$$.MODULE$$"
+}
--- End diff --

Definitely, I added some complex test cases with nests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21021: [SPARK-23921][SQL] Add array_sort function

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21021
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21021: [SPARK-23921][SQL] Add array_sort function

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21021
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2784/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21021: [SPARK-23921][SQL] Add array_sort function

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21021
  
**[Test build #90021 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90021/testReport)**
 for PR 21021 at commit 
[`9f63a76`](https://github.com/apache/spark/commit/9f63a766dc7308c564a7d59cbad58ee8c0a15faa).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21173: [SPARK-23856][SQL] Add an option `queryTimeout` i...

2018-05-01 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21173#discussion_r185376634
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 ---
@@ -89,6 +89,10 @@ class JDBCOptions(
   // the number of partitions
   val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt)
 
+  // the number of seconds the driver will wait for a Statement object to 
execute to the given
+  // number of seconds. Zero means there is no limit.
+  val queryTimeout = parameters.getOrElse(JDBC_QUERY_TIMEOUT, "0").toInt
--- End diff --

You suggested here?

https://github.com/apache/spark/blob/6782359a04356e4cde32940861bf2410ef37f445/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L257

I feel it's enough to update a doc for parameter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21187: [SPARK-24035][SQL] SQL syntax for Pivot

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21187
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21187: [SPARK-24035][SQL] SQL syntax for Pivot

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21187
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90012/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21187: [SPARK-24035][SQL] SQL syntax for Pivot

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21187
  
**[Test build #90012 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90012/testReport)**
 for PR 21187 at commit 
[`171c0c2`](https://github.com/apache/spark/commit/171c0c27d1ed79c7df7fe32c5ac0262096315273).
 * This patch **fails SparkR unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21108: [SPARK-24027][SQL] Support MapType with StringType for k...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21108
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21108: [SPARK-24027][SQL] Support MapType with StringType for k...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21108
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90013/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21108: [SPARK-24027][SQL] Support MapType with StringType for k...

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21108
  
**[Test build #90013 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90013/testReport)**
 for PR 21108 at commit 
[`840a3a1`](https://github.com/apache/spark/commit/840a3a14890cb9a03efb806d7f07025618abc604).
 * This patch **fails SparkR unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21073
  
**[Test build #90020 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90020/testReport)**
 for PR 21073 at commit 
[`d9dccd3`](https://github.com/apache/spark/commit/d9dccd3c96a5d1ffdfce0225828af21bd1ee8303).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21182: [SPARK-24068] Propagating DataFrameReader's options to T...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21182
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90010/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21182: [SPARK-24068] Propagating DataFrameReader's options to T...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21182
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21182: [SPARK-24068] Propagating DataFrameReader's options to T...

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21182
  
**[Test build #90010 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90010/testReport)**
 for PR 21182 at commit 
[`9f55aa8`](https://github.com/apache/spark/commit/9f55aa8fa9589bd23fddfe83f9badff02492e69c).
 * This patch **fails SparkR unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-05-01 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21073
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21206: [SPARK-24133][SQL] Check for integer overflows when resi...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21206
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21206: [SPARK-24133][SQL] Check for integer overflows when resi...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21206
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90015/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21082
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90007/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21082
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21206: [SPARK-24133][SQL] Check for integer overflows when resi...

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21206
  
**[Test build #90015 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90015/testReport)**
 for PR 21206 at commit 
[`ddfd147`](https://github.com/apache/spark/commit/ddfd1475d1ca3cc185ba298ae5927525e766fc73).
 * This patch **fails SparkR unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21082
  
**[Test build #90007 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90007/testReport)**
 for PR 21082 at commit 
[`abfa42b`](https://github.com/apache/spark/commit/abfa42b61344ee43db256dd3d382c41e1868bee1).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21197: [SPARK-23971] [BACKPORT-2.3] Should not leak Spark sessi...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21197
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21197: [SPARK-23971] [BACKPORT-2.3] Should not leak Spark sessi...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21197
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90016/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21197: [SPARK-23971] [BACKPORT-2.3] Should not leak Spark sessi...

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21197
  
**[Test build #90016 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90016/testReport)**
 for PR 21197 at commit 
[`c1c6377`](https://github.com/apache/spark/commit/c1c63776687eef0be8f2675b3c247f793b9ea05e).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21173: [SPARK-23856][SQL] Add an option `queryTimeout` i...

2018-05-01 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21173#discussion_r185373655
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
---
@@ -1190,4 +1190,20 @@ class JDBCSuite extends SparkFunSuite
   assert(sql("select * from people_view").schema === schema)
 }
   }
+
+  test("SPARK-23856 Spark jdbc setQueryTimeout option") {
+val numJoins = 100
+val longRunningQuery =
--- End diff --

ok, I'll add tests for that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13493: [SPARK-15750][MLLib][PYSPARK] Constructing FPGrowth fail...

2018-05-01 Thread zjffdu
Github user zjffdu commented on the issue:

https://github.com/apache/spark/pull/13493
  
Thanks @jkbradley The failed tests seems unrelated. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185365085
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+// Force the epoch to end here. The writer will notice the 
context is interrupted
+// or completed and not start a new one. This makes it 
possible to achieve clean
+// shutdown of the streaming query.
+// TODO: The obvious generalization of this logic to multiple 
stages won't work. It's
+// invalid to send an epoch marker from the bottom of a task 
if all its child tasks
+// haven't sent one.
+currentEntry = (null, null)
+   

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185371525
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, 
logInfo}
+import org.apache.spark.sql.sources.v2.writer.{DataWriter, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.util.Utils
+
+class ContinuousWriteRDD(var prev: RDD[InternalRow], writeTask: 
DataWriterFactory[InternalRow])
--- End diff --

add docs 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185372507
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, 
logInfo}
+import org.apache.spark.sql.sources.v2.writer.{DataWriter, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.util.Utils
+
+class ContinuousWriteRDD(var prev: RDD[InternalRow], writeTask: 
DataWriterFactory[InternalRow])
+extends RDD[Unit](prev) {
+
+  override val partitioner = prev.partitioner
+
+  override def getPartitions: Array[Partition] = prev.partitions
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[Unit] = {
+val epochCoordinator = EpochCoordinatorRef.get(
+  
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+  SparkEnv.get)
+var currentEpoch = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+do {
+  var dataWriter: DataWriter[InternalRow] = null
+  // write the data and commit this writer.
+  Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+try {
+  val dataIterator = prev.compute(split, context)
+  dataWriter = writeTask.createDataWriter(
+context.partitionId(), context.attemptNumber(), currentEpoch)
+  while (dataIterator.hasNext) {
+dataWriter.write(dataIterator.next())
+  }
+  logInfo(s"Writer for partition ${context.partitionId()} " +
+s"in epoch $currentEpoch is committing.")
+  val msg = dataWriter.commit()
+  epochCoordinator.send(
+CommitPartitionEpoch(context.partitionId(), currentEpoch, msg)
+  )
+  logInfo(s"Writer for partition ${context.partitionId()} " +
+s"in epoch $currentEpoch committed.")
+  currentEpoch += 1
--- End diff --

I am having trouble tracking how the currentEpoch is updated and used. Is 
this field `currentEpoch` used anywhere outside this class? The 
`ContinuousQueuedDataReader` also has currentEpoch being incremented. I am 
confused on what is used where.

Cant we converge the different flags to a common thread-local variable that 
is initialized using the local property, incremented at one place (say, by this 
writer class) and used everywhere?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185351224
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
--- End diff --

I think its a bad idea to have sqlContext in the constructor because this 
can accidentally serialize the SQLContext, and we dont want that. In fact, if 
you only need few confs from the SQLContext, then maybe, we should only have 
those in the constructor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185364497
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.Closeable
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A wrapper for a continuous processing data reader, including a reading 
queue and epoch markers.
+ *
+ * This will be instantiated once per partition - successive calls to 
compute() in the
+ * [[ContinuousDataSourceRDD]] will reuse the same reader. This is 
required to get continuity of
+ * offsets across epochs.
+ *
+ * For performance reasons, this is very weakly encapsulated. There are 
six handles for the RDD:
+ *  * currentOffset - contains the offset of the most recent row which a 
compute() iterator has sent
+ *upwards. The RDD is responsible for advancing this.
+ *  * currentEpoch - the epoch which is currently occurring. The RDD is 
responsible for incrementing
+ *this before ending the compute() iterator.
+ *  * queue - the queue of incoming rows (row, offset) or epoch markers 
(null, null). The
+ *ContinuousQueuedDataReader writes into this queue, and RDD.compute() 
will read from it.
+ *  * {epochPoll|dataReader}Failed - flags to check if the epoch poll and 
data reader threads are
+ *still running. These threads won't be restarted if they fail, so the 
RDD should intercept
+ *this state when convenient to fail the query.
+ *  * close() - to close this reader when the query is going to shut down.
+ */
+class ContinuousQueuedDataReader(
+split: Partition,
+context: TaskContext,
+dataQueueSize: Int,
+epochPollIntervalMs: Long) extends Closeable {
+  private val reader = 
split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]]
+.readerFactory.createDataReader()
+
+  // Important sequencing - we must get our starting point before the 
provider threads start running
+  var currentOffset: PartitionOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+  var currentEpoch: Long = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  // This queue contains two types of messages:
+  // * (null, null) representing an epoch boundary.
+  // * (row, off) containing a data row and its corresponding 
PartitionOffset.
+  val queue = new ArrayBlockingQueue[(UnsafeRow, 
PartitionOffset)](dataQueueSize)
+
+  val epochPollFailed = new AtomicBoolean(false)
+  val dataReaderFailed = new AtomicBoolean(false)
+
+  private val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+
+  private val epochPollExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+s"epoch-poll--$coordinatorId--${context.partitionId()}")
+  val epochPollRunnable = new EpochPollRunnable(queue, context, 
epochPollFailed)
--- End diff --

Why is this public val?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185365453
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.Closeable
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A wrapper for a continuous processing data reader, including a reading 
queue and epoch markers.
+ *
+ * This will be instantiated once per partition - successive calls to 
compute() in the
+ * [[ContinuousDataSourceRDD]] will reuse the same reader. This is 
required to get continuity of
+ * offsets across epochs.
+ *
+ * For performance reasons, this is very weakly encapsulated. There are 
six handles for the RDD:
--- End diff --

This can be much more strongly encapsulated. There is no need to expose 
`queue`, `epochPollFailed` and `dataReaderFailed`. See comment in the RDD class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185357003
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
--- End diff --

Does this whole reader map need to be serialized for every task? because as 
it is now, this whole this going to be serialized for every task. Per-partition 
objects like this should be passed through the RDDPartition object. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185371024
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.Closeable
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A wrapper for a continuous processing data reader, including a reading 
queue and epoch markers.
+ *
+ * This will be instantiated once per partition - successive calls to 
compute() in the
+ * [[ContinuousDataSourceRDD]] will reuse the same reader. This is 
required to get continuity of
+ * offsets across epochs.
+ *
+ * For performance reasons, this is very weakly encapsulated. There are 
six handles for the RDD:
+ *  * currentOffset - contains the offset of the most recent row which a 
compute() iterator has sent
+ *upwards. The RDD is responsible for advancing this.
+ *  * currentEpoch - the epoch which is currently occurring. The RDD is 
responsible for incrementing
+ *this before ending the compute() iterator.
+ *  * queue - the queue of incoming rows (row, offset) or epoch markers 
(null, null). The
+ *ContinuousQueuedDataReader writes into this queue, and RDD.compute() 
will read from it.
+ *  * {epochPoll|dataReader}Failed - flags to check if the epoch poll and 
data reader threads are
+ *still running. These threads won't be restarted if they fail, so the 
RDD should intercept
+ *this state when convenient to fail the query.
+ *  * close() - to close this reader when the query is going to shut down.
+ */
+class ContinuousQueuedDataReader(
--- End diff --

This class is best understood only when you see both `DataReaderThread` and 
`EpochPollRunnable` code. And these classes share a lot of objects between 
themselves (flags, taskcontext, etc.). So I think it makes more sense to have 
the `DataReaderThread` and `EpochPollRunnable` as inner classes of this 
`ContinuousQueuedDataReader` class. Would make the logic easier to follow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185365888
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/DataReaderThread.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.BlockingQueue
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.sources.v2.reader.DataReader
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
+
+/**
+ * The data component of [[ContinuousQueuedDataReader]]. Pushes (row, 
offset) to the queue when
+ * a new row arrives to the [[DataReader]].
+ */
+class DataReaderThread(
--- End diff --

Rename to ContinuousDataReaderThread.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185356454
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+// Force the epoch to end here. The writer will notice the 
context is interrupted
+// or completed and not start a new one. This makes it 
possible to achieve clean
+// shutdown of the streaming query.
+// TODO: The obvious generalization of this logic to multiple 
stages won't work. It's
--- End diff --

It's hard to make sense what this means. What is "bottom of a task"??


---

-
To unsubscribe, 

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185369491
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochPollRunnable.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.BlockingQueue
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
+
+/**
+ * The epoch marker component of [[ContinuousQueuedDataReader]]. Populates 
the queue with
+ * (null, null) when a new epoch marker arrives.
+ */
+class EpochPollRunnable(
--- End diff --

Its hard to understand what "EpochPollRunnable" means. Rather how about 
"EpochMarkerGenerator"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185364276
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.Closeable
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A wrapper for a continuous processing data reader, including a reading 
queue and epoch markers.
+ *
+ * This will be instantiated once per partition - successive calls to 
compute() in the
+ * [[ContinuousDataSourceRDD]] will reuse the same reader. This is 
required to get continuity of
+ * offsets across epochs.
+ *
+ * For performance reasons, this is very weakly encapsulated. There are 
six handles for the RDD:
+ *  * currentOffset - contains the offset of the most recent row which a 
compute() iterator has sent
+ *upwards. The RDD is responsible for advancing this.
+ *  * currentEpoch - the epoch which is currently occurring. The RDD is 
responsible for incrementing
+ *this before ending the compute() iterator.
+ *  * queue - the queue of incoming rows (row, offset) or epoch markers 
(null, null). The
+ *ContinuousQueuedDataReader writes into this queue, and RDD.compute() 
will read from it.
+ *  * {epochPoll|dataReader}Failed - flags to check if the epoch poll and 
data reader threads are
+ *still running. These threads won't be restarted if they fail, so the 
RDD should intercept
+ *this state when convenient to fail the query.
+ *  * close() - to close this reader when the query is going to shut down.
+ */
+class ContinuousQueuedDataReader(
+split: Partition,
+context: TaskContext,
+dataQueueSize: Int,
+epochPollIntervalMs: Long) extends Closeable {
+  private val reader = 
split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]]
+.readerFactory.createDataReader()
+
+  // Important sequencing - we must get our starting point before the 
provider threads start running
+  var currentOffset: PartitionOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
--- End diff --

How is this synchronized? Isnt this accessed from the task iterator thread 
and the data reader thread?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185369742
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochPollRunnable.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.BlockingQueue
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
+
+/**
+ * The epoch marker component of [[ContinuousQueuedDataReader]]. Populates 
the queue with
+ * (null, null) when a new epoch marker arrives.
+ */
+class EpochPollRunnable(
+queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
+context: TaskContext,
+failedFlag: AtomicBoolean)
+  extends Thread with Logging {
+  private[continuous] var failureReason: Throwable = _
+
+  private val epochEndpoint = EpochCoordinatorRef.get(
+
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), 
SparkEnv.get)
+  // Note that this is *not* the same as the currentEpoch in 
[[ContinuousDataQueuedReader]]! That
+  // field represents the epoch wrt the data being processed. The 
currentEpoch here is just a
+  // counter to ensure we send the appropriate number of markers if we 
fall behind the driver.
+  private var currentEpoch = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  override def run(): Unit = {
+try {
+  val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch)
+  for (i <- currentEpoch to newEpoch - 1) {
--- End diff --

I strongly suggest adding more docs here to explain this logic. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185356694
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
--- End diff --

Please add docs on what this method does. This is a large method, and 
breaking it down into smaller internal methods may be beneficial (or at least 
documenting the sections).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185370511
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.Closeable
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A wrapper for a continuous processing data reader, including a reading 
queue and epoch markers.
+ *
+ * This will be instantiated once per partition - successive calls to 
compute() in the
+ * [[ContinuousDataSourceRDD]] will reuse the same reader. This is 
required to get continuity of
+ * offsets across epochs.
+ *
+ * For performance reasons, this is very weakly encapsulated. There are 
six handles for the RDD:
+ *  * currentOffset - contains the offset of the most recent row which a 
compute() iterator has sent
+ *upwards. The RDD is responsible for advancing this.
+ *  * currentEpoch - the epoch which is currently occurring. The RDD is 
responsible for incrementing
+ *this before ending the compute() iterator.
+ *  * queue - the queue of incoming rows (row, offset) or epoch markers 
(null, null). The
+ *ContinuousQueuedDataReader writes into this queue, and RDD.compute() 
will read from it.
+ *  * {epochPoll|dataReader}Failed - flags to check if the epoch poll and 
data reader threads are
+ *still running. These threads won't be restarted if they fail, so the 
RDD should intercept
+ *this state when convenient to fail the query.
+ *  * close() - to close this reader when the query is going to shut down.
+ */
+class ContinuousQueuedDataReader(
+split: Partition,
+context: TaskContext,
+dataQueueSize: Int,
+epochPollIntervalMs: Long) extends Closeable {
+  private val reader = 
split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]]
+.readerFactory.createDataReader()
+
+  // Important sequencing - we must get our starting point before the 
provider threads start running
+  var currentOffset: PartitionOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+  var currentEpoch: Long = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  // This queue contains two types of messages:
+  // * (null, null) representing an epoch boundary.
+  // * (row, off) containing a data row and its corresponding 
PartitionOffset.
+  val queue = new ArrayBlockingQueue[(UnsafeRow, 
PartitionOffset)](dataQueueSize)
--- End diff --

Commented above, this does not need to be public. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185352237
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
--- End diff --

Why not just extend DataSourceRDD?? That would dedup quite a bit of the 
code related to `getPartitions` and `preferredLocations`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185358647
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+// Force the epoch to end here. The writer will notice the 
context is interrupted
+// or completed and not start a new one. This makes it 
possible to achieve clean
+// shutdown of the streaming query.
+// TODO: The obvious generalization of this logic to multiple 
stages won't work. It's
+// invalid to send an epoch marker from the bottom of a task 
if all its child tasks
+// haven't sent one.
+currentEntry = (null, null)
+   

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185353202
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
--- End diff --

It might be slightly cleaner to implement this using 
spark.util.NextIterator.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21196: [SPARK-24123][SQL] Fix precision issues in monthsBetween...

2018-05-01 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/21196
  
SparkR failure is irrelevant to this PR and occurs in several other PRs. We 
had better wait for fixing SparkR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21178: [SPARK-24110][Thrift-Server] Avoid UGI.loginUserFromKeyt...

2018-05-01 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/21178
  
@mridulm , can you review again? Thanks a lot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20929: [SPARK-23772][SQL][WIP] Provide an option to ignore colu...

2018-05-01 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/20929
  
oh, sorry, I'll update in a few days.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning

2018-05-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/16578
  
To ensure the PR and review quality, we normally avoid doing everything in 
a single huge PR. It would be much better if you can cut it to a few smaller 
PRs. Both @cloud-fan and I think separating the optimizer rules makes sense. 
WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21050: [SPARK-23912][SQL]add array_distinct

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21050
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21050: [SPARK-23912][SQL]add array_distinct

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21050
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90003/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21050: [SPARK-23912][SQL]add array_distinct

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21050
  
**[Test build #90003 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90003/testReport)**
 for PR 21050 at commit 
[`e57056b`](https://github.com/apache/spark/commit/e57056bf89131ade3883cde56fc051c121ef1f77).
 * This patch **fails SparkR unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21188: [SPARK-24046][SS] Fix rate source rowsPerSecond <= rampU...

2018-05-01 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/21188
  
Isn't this a flat ramp-up smoothly increasing the rows per second? Your 
proposal is another solution, but just two options...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21197: [SPARK-23971] [BACKPORT-2.3] Should not leak Spark sessi...

2018-05-01 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/21197
  
Ur, currently, SparkR test failed in many PR consistently. I can see the 
following Error message and SparkR test seems to run twice according to the 
log. Could you take a look please, @shivaram and @felixcheung ?
```
* checking CRAN incoming feasibility ...Error in 
.check_package_CRAN_incoming(pkgdir) : 
  dims [product 24] do not match the length of object [0]
Execution halted
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21201: [SPARK-24128][SQL] Mention configuration option in impli...

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21201
  
**[Test build #90019 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90019/testReport)**
 for PR 21201 at commit 
[`b267513`](https://github.com/apache/spark/commit/b267513eda6cfc9523549d8c24463b660893cda6).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21201: [SPARK-24128][SQL] Mention configuration option in impli...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21201
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21201: [SPARK-24128][SQL] Mention configuration option in impli...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21201
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2783/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21201: [SPARK-24128][SQL] Mention configuration option in impli...

2018-05-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21201
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21201: [SPARK-24128][SQL] Mention configuration option in impli...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21201
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90014/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21201: [SPARK-24128][SQL] Mention configuration option in impli...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21201
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21201: [SPARK-24128][SQL] Mention configuration option in impli...

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21201
  
**[Test build #90014 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90014/testReport)**
 for PR 21201 at commit 
[`b267513`](https://github.com/apache/spark/commit/b267513eda6cfc9523549d8c24463b660893cda6).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13493: [SPARK-15750][MLLib][PYSPARK] Constructing FPGrowth fail...

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/13493
  
**[Test build #4167 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4167/testReport)**
 for PR 13493 at commit 
[`fbbcd26`](https://github.com/apache/spark/commit/fbbcd263c32a008873c7f080e5abadf1c01fa006).
 * This patch **fails Spark unit tests**.
 * This patch **does not merge cleanly**.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20894: [SPARK-23786][SQL] Checking column names of csv headers

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20894
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20894: [SPARK-23786][SQL] Checking column names of csv headers

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20894
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90011/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20894: [SPARK-23786][SQL] Checking column names of csv headers

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20894
  
**[Test build #90011 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90011/testReport)**
 for PR 20894 at commit 
[`ad6cda4`](https://github.com/apache/spark/commit/ad6cda4c9ffe46831d956c5fc92a272d98a4e731).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21040: [SPARK-23930][SQL] Add slice function

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21040
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21040: [SPARK-23930][SQL] Add slice function

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21040
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90005/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21040: [SPARK-23930][SQL] Add slice function

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21040
  
**[Test build #90005 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90005/testReport)**
 for PR 21040 at commit 
[`9d65570`](https://github.com/apache/spark/commit/9d655708c2f0bbf18ab7044fb03cf899a0eba4eb).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21050: [SPARK-23912][SQL]add array_distinct

2018-05-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21050
  
cc @ueshin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-05-01 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21073
  
A test failed with "./bin/spark-submit ... No such file or directory"

Seems like there's lots of spurious test failures right now. I will hold 
off on re-running for a little while.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21208: [SPARK-23925][SQL] Add array_repeat collection function

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21208
  
**[Test build #90018 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90018/testReport)**
 for PR 21208 at commit 
[`88d8425`](https://github.com/apache/spark/commit/88d84252eb87e9d16b0e274db6db007133999e78).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21197: [SPARK-23971] [BACKPORT-2.3] Should not leak Spark sessi...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21197
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21197: [SPARK-23971] [BACKPORT-2.3] Should not leak Spark sessi...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21197
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2782/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21031: [SPARK-23923][SQL] Add cardinality function

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21031
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21197: [SPARK-23971] [BACKPORT-2.3] Should not leak Spark sessi...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21197
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21031: [SPARK-23923][SQL] Add cardinality function

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21031
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2781/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21197: [SPARK-23971] [BACKPORT-2.3] Should not leak Spark sessi...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21197
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2780/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21208: [SPARK-23925][SQL] Add array_repeat collection function

2018-05-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21208
  
cc @ueshin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21208: [SPARK-23925][SQL] Add array_repeat collection function

2018-05-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21208
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21031: [SPARK-23923][SQL] Add cardinality function

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21031
  
**[Test build #90017 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90017/testReport)**
 for PR 21031 at commit 
[`dd46bbf`](https://github.com/apache/spark/commit/dd46bbf0379a52cf18315b88e4be374e0d8b1956).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21197: [SPARK-23971] [BACKPORT-2.3] Should not leak Spark sessi...

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21197
  
**[Test build #90016 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90016/testReport)**
 for PR 21197 at commit 
[`c1c6377`](https://github.com/apache/spark/commit/c1c63776687eef0be8f2675b3c247f793b9ea05e).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21197: [SPARK-23971] [BACKPORT-2.3] Should not leak Spark sessi...

2018-05-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21197
  
test this plesase


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21031: [SPARK-23923][SQL] Add cardinality function

2018-05-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21031
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21197: [SPARK-23971] [BACKPORT-2.3] Should not leak Spark sessi...

2018-05-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21197
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17086: [SPARK-24101][ML][MLLIB] ML Evaluators should use weight...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17086
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17086: [SPARK-24101][ML][MLLIB] ML Evaluators should use weight...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17086
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90008/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17086: [SPARK-24101][ML][MLLIB] ML Evaluators should use weight...

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17086
  
**[Test build #90008 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90008/testReport)**
 for PR 17086 at commit 
[`0ddf854`](https://github.com/apache/spark/commit/0ddf8544fdcf6aa261e0db52eee532a6d6e3cb1e).
 * This patch **fails SparkR unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20235: [Spark-22887][ML][TESTS][WIP] ML test for StructuredStre...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20235
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90006/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20235: [Spark-22887][ML][TESTS][WIP] ML test for StructuredStre...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20235
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20235: [Spark-22887][ML][TESTS][WIP] ML test for StructuredStre...

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20235
  
**[Test build #90006 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90006/testReport)**
 for PR 20235 at commit 
[`c0f3056`](https://github.com/apache/spark/commit/c0f3056d8b737ab23621950d58da714188fe641c).
 * This patch **fails SparkR unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185351152
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
--- End diff --

I think its a bad idea to have sqlContext in the constructor because this 
can accidentally serialize the SQLContext, and we dont want that. In fact, if 
you only need few confs from the SQLContext, then maybe, we should only have 
those in the constructor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20261: [SPARK-22885][ML][TEST] ML test for StructuredStreaming:...

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20261
  
**[Test build #4168 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4168/testReport)**
 for PR 20261 at commit 
[`d52b8ca`](https://github.com/apache/spark/commit/d52b8ca9d4a8d46e9e9e3489401fe8e01cfc7c73).
 * This patch **fails SparkR unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21206: [SPARK-24133][SQL] Check for integer overflows when resi...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21206
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21206: [SPARK-24133][SQL] Check for integer overflows when resi...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21206
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2779/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21196: [SPARK-24123][SQL] Fix precision issues in monthsBetween...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21196
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21196: [SPARK-24123][SQL] Fix precision issues in monthsBetween...

2018-05-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21196
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89997/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21206: [SPARK-24133][SQL] Check for integer overflows when resi...

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21206
  
**[Test build #90015 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90015/testReport)**
 for PR 21206 at commit 
[`ddfd147`](https://github.com/apache/spark/commit/ddfd1475d1ca3cc185ba298ae5927525e766fc73).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21196: [SPARK-24123][SQL] Fix precision issues in monthsBetween...

2018-05-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21196
  
**[Test build #89997 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89997/testReport)**
 for PR 21196 at commit 
[`4d1775c`](https://github.com/apache/spark/commit/4d1775c2dc40cbbed88042803d21804a6f6ab1dd).
 * This patch **fails SparkR unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   >