[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3801#discussion_r22449971
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
   // failure, are re-processed or not.
   test("recovery with file input stream") {
 // Set up the streaming context and input streams
+val batchDuration = Seconds(2)  // Due to 1-second resolution of 
setLastModified() on some OS's.
 val testDir = Utils.createTempDir()
-var ssc = new StreamingContext(master, framework, Seconds(1))
-ssc.checkpoint(checkpointDir)
-val fileStream = ssc.textFileStream(testDir.toString)
-// Making value 3 take large time to process, to ensure that the master
-// shuts down in the middle of processing the 3rd batch
-val mappedStream = fileStream.map(s => {
-  val i = s.toInt
-  if (i == 3) Thread.sleep(2000)
-  i
-})
-
-// Reducing over a large window to ensure that recovery from master 
failure
-// requires reprocessing of all the files seen before the failure
-val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), 
Seconds(1))
-val outputBuffer = new ArrayBuffer[Seq[Int]]
-var outputStream = new TestOutputStream(reducedStream, outputBuffer)
-outputStream.register()
-ssc.start()
-
-// Create files and advance manual clock to process them
-// var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-Thread.sleep(1000)
-for (i <- Seq(1, 2, 3)) {
-  Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-  // wait to make sure that the file is written such that it gets 
shown in the file listings
-  Thread.sleep(1000)
+val outputBuffer = new ArrayBuffer[Seq[Int]] with 
SynchronizedBuffer[Seq[Int]]
+
+def writeFile(i: Int, clock: ManualClock): Unit = {
+  val file = new File(testDir, i.toString)
+  Files.write(i + "\n", file, Charsets.UTF_8)
+  assert(file.setLastModified(clock.currentTime()))
+  // Check that the file's modification date is actually the value we 
wrote, since rounding or
+  // truncation will break the test:
+  assert(file.lastModified() === clock.currentTime())
 }
-logInfo("Output = " + outputStream.output.mkString(","))
-assert(outputStream.output.size > 0, "No files processed before 
restart")
-ssc.stop()
 
-// Verify whether files created have been recorded correctly or not
-var fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-def recordedFiles = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
-assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
-
-// Create files while the master is down
-for (i <- Seq(4, 5, 6)) {
-  Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-  Thread.sleep(1000)
+def recordedFiles(ssc: StreamingContext): Seq[Int] = {
+  val fileInputDStream =
+ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, 
_, _]]
+  val filenames = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
+  filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
 }
 
-// Recover context from checkpoint file and verify whether the files 
that were
-// recorded before failure were saved and successfully recovered
-logInfo("*** RESTARTING ")
-ssc = new StreamingContext(checkpointDir)
-fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+try {
+  // This is a var because it's re-assigned when we restart from a 
checkpoint:
+  var clock: ManualClock = null
+  withStreamingContext(new StreamingContext(conf, batchDuration)) { 
ssc =>
+ssc.checkpoint(checkpointDir)
+clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+val waiter = new StreamingTestWaiter(ssc)
+val fileStream = ssc.textFileStream(testDir.toString)
+// MKW value 3 take a large time to process, to ensure that the 
driver
+// shuts down in the middle of processing the 3rd batch
+val mappedStream = fileStream.map(s => {
+  val i = s.toInt
+  if (i == 3) Thread.

[GitHub] spark pull request: [SPARK-4226][SQL] SparkSQL - Add support for s...

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3249#issuecomment-68677425
  
  [Test build #25049 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25049/consoleFull)
 for   PR 3249 at commit 
[`09e471a`](https://github.com/apache/spark/commit/09e471aa670298bcb1e8e135d260da54a11de88e).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case class SubqueryExpression(subquery: LogicalPlan) extends 
Expression `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4226][SQL] SparkSQL - Add support for s...

2015-01-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3249#issuecomment-68677427
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25049/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-5087. [YARN] Merge yarn.Client and yarn....

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3896#issuecomment-68677294
  
  [Test build #25048 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25048/consoleFull)
 for   PR 3896 at commit 
[`930df8a`](https://github.com/apache/spark/commit/930df8ae21fd1efec18e26f03dcce4532bb95a24).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-5087. [YARN] Merge yarn.Client and yarn....

2015-01-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3896#issuecomment-68677297
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25048/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5068][SQL]fix bug query data when path ...

2015-01-04 Thread jeanlyn
Github user jeanlyn commented on the pull request:

https://github.com/apache/spark/pull/3891#issuecomment-68676770
  
hi, @marmbrus ,can you please take a look and give some suggestions?thx.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1507][YARN]specify num of cores for AM

2015-01-04 Thread XuTingjun
Github user XuTingjun commented on the pull request:

https://github.com/apache/spark/pull/3806#issuecomment-68676616
  
@sryza, I have splited spark.driver.memory into spark.driver.memory and 
spark.yarn.am.memory. Please have a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5084][SQL]add if not exists after creat...

2015-01-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3895#issuecomment-68676566
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25050/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5084][SQL]add if not exists after creat...

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3895#issuecomment-68676564
  
  [Test build #25050 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25050/consoleFull)
 for   PR 3895 at commit 
[`1757fde`](https://github.com/apache/spark/commit/1757fde12d90d3d6817877ee67e8374ed819cfe6).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5088] Use spark-class for running execu...

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3897#issuecomment-68676178
  
  [Test build #25051 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25051/consoleFull)
 for   PR 3897 at commit 
[`ed906b7`](https://github.com/apache/spark/commit/ed906b7ae16c08521f8e3b4aa3da47fd293abff3).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5088] Use spark-class for running execu...

2015-01-04 Thread jongyoul
GitHub user jongyoul opened a pull request:

https://github.com/apache/spark/pull/3897

[SPARK-5088] Use spark-class for running executors directly



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jongyoul/spark SPARK-5088

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3897.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3897


commit 4949cd5de383f4aba8555ea5935ced03dbeaa6c4
Author: Jongyoul Lee 
Date:   2015-01-05T07:24:16Z

[SPARK-5088] Use spark-class for running executors directly
- Changed command for using spark-class directly
- Delete sbin/spark-executor and moved some codes into spark-class' case 
statement

commit ed906b7ae16c08521f8e3b4aa3da47fd293abff3
Author: Jongyoul Lee 
Date:   2015-01-05T07:25:47Z

[SPARK-5088] Use spark-class for running executors directly
- Adjusted orders of import




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3801#discussion_r22449487
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---
@@ -104,6 +106,77 @@ class TestOutputStreamWithPartitions[T: 
ClassTag](parent: DStream[T],
 }
 
 /**
+ * This is an interface that can be used to block until certain events 
occur, such as
+ * the start/completion of batches.  This is much less brittle than 
waiting on wall-clock time.
+ * Internally, this is implemented using a StreamingListener.  
Constructing a new instance of this
+ * class automatically registers a StreamingListener on the given 
StreamingContext.
+ */
+class StreamingTestWaiter(ssc: StreamingContext) {
+
+  // All access to this state should be guarded by 
`StreamingTestWaiter.this.synchronized`
+  private var numCompletedBatches = 0
+  private var numStartedBatches = 0
+
+  private val listener = new StreamingListener {
+override def onBatchStarted(batchStarted: 
StreamingListenerBatchStarted): Unit =
+  StreamingTestWaiter.this.synchronized {
+numStartedBatches += 1
+StreamingTestWaiter.this.notifyAll()
+  }
+override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit =
+  StreamingTestWaiter.this.synchronized {
+numCompletedBatches += 1
+StreamingTestWaiter.this.notifyAll()
+  }
+  }
+  ssc.addStreamingListener(listener)
+
+  def getNumCompletedBatches: Int = this.synchronized {
+numCompletedBatches
+  }
+
+  def getNumStartedBatches: Int = this.synchronized {
+numStartedBatches
+  }
+
+  /**
+   * Block until the number of completed batches reaches the given 
threshold.
+   */
+  def waitForTotalBatchesCompleted(
--- End diff --

Actually, even this might be overkill: I can just use vanilla `eventually` 
since we're still blocking on a condition to occur and not relying on 
real-clock time; I guess using `wait` instead of `sleep` is just an 
optimization that might save a small amount of test time, but it's not related 
to flakiness (the goal of this PR).  Therefore, I'll just remove all of this in 
favor of `eventually` since then I'll benefit from ScalaTest's nice assertion 
macros.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5084][SQL]add if not exists after creat...

2015-01-04 Thread baishuo
Github user baishuo commented on the pull request:

https://github.com/apache/spark/pull/3895#issuecomment-68675593
  
some explain:
if we want to use mysql instead of derby to store the metadata for 
spark-sql, we add the param such as 
"javax.jdo.option.ConnectionURL" to a hive-site.xml, it will 
always overrided by the hard code to of TestHive.scala or HiveContext.scala
and if there is alread a database called "default" in metastore storage, 
the code "context.runSqlHive("CREATE DATABASE default")" will invoke an 
Exception:  Database default already exists


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5084][SQL]add if not exists after creat...

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3895#issuecomment-68675297
  
  [Test build #25050 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25050/consoleFull)
 for   PR 3895 at commit 
[`1757fde`](https://github.com/apache/spark/commit/1757fde12d90d3d6817877ee67e8374ed819cfe6).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5084][SQL]add if not exists after creat...

2015-01-04 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/3895#issuecomment-68675083
  
ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3801#discussion_r22449238
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---
@@ -104,6 +106,77 @@ class TestOutputStreamWithPartitions[T: 
ClassTag](parent: DStream[T],
 }
 
 /**
+ * This is an interface that can be used to block until certain events 
occur, such as
+ * the start/completion of batches.  This is much less brittle than 
waiting on wall-clock time.
+ * Internally, this is implemented using a StreamingListener.  
Constructing a new instance of this
+ * class automatically registers a StreamingListener on the given 
StreamingContext.
+ */
+class StreamingTestWaiter(ssc: StreamingContext) {
+
+  // All access to this state should be guarded by 
`StreamingTestWaiter.this.synchronized`
+  private var numCompletedBatches = 0
+  private var numStartedBatches = 0
+
+  private val listener = new StreamingListener {
+override def onBatchStarted(batchStarted: 
StreamingListenerBatchStarted): Unit =
+  StreamingTestWaiter.this.synchronized {
+numStartedBatches += 1
+StreamingTestWaiter.this.notifyAll()
+  }
+override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit =
+  StreamingTestWaiter.this.synchronized {
+numCompletedBatches += 1
+StreamingTestWaiter.this.notifyAll()
+  }
+  }
+  ssc.addStreamingListener(listener)
+
+  def getNumCompletedBatches: Int = this.synchronized {
+numCompletedBatches
+  }
+
+  def getNumStartedBatches: Int = this.synchronized {
+numStartedBatches
+  }
+
+  /**
+   * Block until the number of completed batches reaches the given 
threshold.
+   */
+  def waitForTotalBatchesCompleted(
--- End diff --

Actually, something even simpler (since this is just for test code): I can 
just copy ScalaTest's `eventually` and modify it to use synchronize / notify 
instead of polling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3801#discussion_r22449114
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---
@@ -104,6 +106,77 @@ class TestOutputStreamWithPartitions[T: 
ClassTag](parent: DStream[T],
 }
 
 /**
+ * This is an interface that can be used to block until certain events 
occur, such as
+ * the start/completion of batches.  This is much less brittle than 
waiting on wall-clock time.
+ * Internally, this is implemented using a StreamingListener.  
Constructing a new instance of this
+ * class automatically registers a StreamingListener on the given 
StreamingContext.
+ */
+class StreamingTestWaiter(ssc: StreamingContext) {
+
+  // All access to this state should be guarded by 
`StreamingTestWaiter.this.synchronized`
+  private var numCompletedBatches = 0
+  private var numStartedBatches = 0
+
+  private val listener = new StreamingListener {
+override def onBatchStarted(batchStarted: 
StreamingListenerBatchStarted): Unit =
+  StreamingTestWaiter.this.synchronized {
+numStartedBatches += 1
+StreamingTestWaiter.this.notifyAll()
+  }
+override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit =
+  StreamingTestWaiter.this.synchronized {
+numCompletedBatches += 1
+StreamingTestWaiter.this.notifyAll()
+  }
+  }
+  ssc.addStreamingListener(listener)
+
+  def getNumCompletedBatches: Int = this.synchronized {
+numCompletedBatches
+  }
+
+  def getNumStartedBatches: Int = this.synchronized {
+numStartedBatches
+  }
+
+  /**
+   * Block until the number of completed batches reaches the given 
threshold.
+   */
+  def waitForTotalBatchesCompleted(
--- End diff --

It occurred to me that this might be misleadingly-named since it waits 
until _at least_ that many batches have been processed.  To avoid this naming 
issue, plus a proliferation of similar methods, I might be able to just 
introduce a helper class that encapsulates this "synchronize on an object and 
wait for a condition involving it to become true" pattern.

I'm imagining that it could look something vaguely like

```scala
def waitUntil[T](obj: T, condition: T => Boolean, timeout: Long): Unit = {
  obj.synchronized {
 while(!condition(obj)) {
  [...] // do the wait() logic here
 }
   }
}

that encapsulates this wait / notify pattern, so could write something like

```scala
waitUntil(waiter, _.completedBatches > 2, timeout, seconds(10)
```

Or, with an implicit conversion, something like

```
waiter.waitUntil(_.completedBatches > 2, timeout=seconds(10))
```

which is a nice-looking syntax and avoids those issues of having to name 
inequalities.

Similar to [your 
suggestion](https://github.com/apache/spark/pull/3868/files#r22447509) on 
another PR, we could add a `pollUntil` method that works for objects that don't 
support monitor notification / synchronization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4233] [SQL] WIP:Simplify the UDAF API (...

2015-01-04 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/3247#issuecomment-68674051
  
@marmbrus , this PR passed the unit test, but some of details need to be 
discussed. Can you review this? Particularly for the UDAF interface design. 

Sorry about so many code changes, as I almost rewrote all of the UDAF 
relevant code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4226][SQL] SparkSQL - Add support for s...

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3249#issuecomment-68673730
  
  [Test build #25049 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25049/consoleFull)
 for   PR 3249 at commit 
[`09e471a`](https://github.com/apache/spark/commit/09e471aa670298bcb1e8e135d260da54a11de88e).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3801#discussion_r22448937
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
   // failure, are re-processed or not.
   test("recovery with file input stream") {
 // Set up the streaming context and input streams
+val batchDuration = Seconds(2)  // Due to 1-second resolution of 
setLastModified() on some OS's.
 val testDir = Utils.createTempDir()
-var ssc = new StreamingContext(master, framework, Seconds(1))
-ssc.checkpoint(checkpointDir)
-val fileStream = ssc.textFileStream(testDir.toString)
-// Making value 3 take large time to process, to ensure that the master
-// shuts down in the middle of processing the 3rd batch
-val mappedStream = fileStream.map(s => {
-  val i = s.toInt
-  if (i == 3) Thread.sleep(2000)
-  i
-})
-
-// Reducing over a large window to ensure that recovery from master 
failure
-// requires reprocessing of all the files seen before the failure
-val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), 
Seconds(1))
-val outputBuffer = new ArrayBuffer[Seq[Int]]
-var outputStream = new TestOutputStream(reducedStream, outputBuffer)
-outputStream.register()
-ssc.start()
-
-// Create files and advance manual clock to process them
-// var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-Thread.sleep(1000)
-for (i <- Seq(1, 2, 3)) {
-  Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-  // wait to make sure that the file is written such that it gets 
shown in the file listings
-  Thread.sleep(1000)
+val outputBuffer = new ArrayBuffer[Seq[Int]] with 
SynchronizedBuffer[Seq[Int]]
+
+def writeFile(i: Int, clock: ManualClock): Unit = {
+  val file = new File(testDir, i.toString)
+  Files.write(i + "\n", file, Charsets.UTF_8)
+  assert(file.setLastModified(clock.currentTime()))
+  // Check that the file's modification date is actually the value we 
wrote, since rounding or
+  // truncation will break the test:
+  assert(file.lastModified() === clock.currentTime())
 }
-logInfo("Output = " + outputStream.output.mkString(","))
-assert(outputStream.output.size > 0, "No files processed before 
restart")
-ssc.stop()
 
-// Verify whether files created have been recorded correctly or not
-var fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-def recordedFiles = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
-assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
-
-// Create files while the master is down
-for (i <- Seq(4, 5, 6)) {
-  Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-  Thread.sleep(1000)
+def recordedFiles(ssc: StreamingContext): Seq[Int] = {
+  val fileInputDStream =
+ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, 
_, _]]
+  val filenames = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
+  filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
 }
 
-// Recover context from checkpoint file and verify whether the files 
that were
-// recorded before failure were saved and successfully recovered
-logInfo("*** RESTARTING ")
-ssc = new StreamingContext(checkpointDir)
-fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+try {
+  // This is a var because it's re-assigned when we restart from a 
checkpoint:
+  var clock: ManualClock = null
+  withStreamingContext(new StreamingContext(conf, batchDuration)) { 
ssc =>
+ssc.checkpoint(checkpointDir)
+clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+val waiter = new StreamingTestWaiter(ssc)
+val fileStream = ssc.textFileStream(testDir.toString)
+// MKW value 3 take a large time to process, to ensure that the 
driver
+// shuts down in the middle of processing the 3rd batch
+val mappedStream = fileStream.map(s => {
+  val i = s.toInt
+  if (i == 3) Thread.

[GitHub] spark pull request: [SPARK-4226][SQL] SparkSQL - Add support for s...

2015-01-04 Thread ravipesala
Github user ravipesala commented on the pull request:

https://github.com/apache/spark/pull/3249#issuecomment-68673618
  
Thank you for reviewing it. Fixed the review comments. And added the TODO 
for future expansion of complex queries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-5087. [YARN] Merge yarn.Client and yarn....

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3896#issuecomment-68673487
  
  [Test build #25048 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25048/consoleFull)
 for   PR 3896 at commit 
[`930df8a`](https://github.com/apache/spark/commit/930df8ae21fd1efec18e26f03dcce4532bb95a24).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4226][SQL] SparkSQL - Add support for s...

2015-01-04 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/spark/pull/3249#discussion_r22448876
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -414,6 +418,123 @@ class Analyzer(catalog: Catalog,
 Generate(g, join = false, outer = false, None, child)
 }
   }
+
+  /**
+   * Transforms the query which has subquery expressions in where clause 
to join queries.
+   * Case 1 Uncorelated queries
+   * -- original query
+   * select C from R1 where R1.A in (Select B from R2)
+   * -- rewritten query
+   * Select C from R1 left semi join (select B as sqc0 from R2) subquery 
on R1.A = subquery.sqc0
+   *
+   * Case 2 Corelated queries
+   * -- original query
+   * select C from R1 where R1.A in (Select B from R2 where R1.X = R2.Y)
+   * -- rewritten query
+   * select C from R1 left semi join (select B as sqc0, R2.Y as sqc1 from 
R2) subquery
+   *   on R1.X = subquery.sqc1 and R1.A = subquery.sqc0
+   * 
+   * Refer: 
https://issues.apache.org/jira/secure/attachment/12614003/SubQuerySpec.pdf
+   */
+  object SubQueryExpressions extends Rule[LogicalPlan] {
+
+def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case p: LogicalPlan if !p.childrenResolved => p
+  case filter @ Filter(conditions, child) =>
+val subqueryExprs = new scala.collection.mutable.ArrayBuffer[In]()
+val nonSubQueryConds = new 
scala.collection.mutable.ArrayBuffer[Expression]()
+conditions.collect {
+  case s @ In(exp, Seq(SubqueryExpression(subquery))) =>
+subqueryExprs += s
+}
+val transformedConds = conditions.transform {
+  // Replace with dummy
+  case s @ In(exp,Seq(SubqueryExpression(subquery))) =>
+Literal(true)
+}
+if (subqueryExprs.size == 1) {
+  val subqueryExpr = subqueryExprs.remove(0)
+  createLeftSemiJoin(
+child,
+subqueryExpr.value,
+subqueryExpr.list(0).asInstanceOf[SubqueryExpression].subquery,
+transformedConds)
+} else if (subqueryExprs.size > 1) {
+  // Only one subquery expression is supported.
+  throw new TreeNodeException(filter, "Only one SubQuery 
expression is supported.")
+} else {
+  filter
+}
+}
+
+/**
+ * Create LeftSemi join with parent query to the subquery which is 
mentioned in 'IN' predicate
+ * And combine the subquery conditions and parent query conditions.
+ */ 
+def createLeftSemiJoin(left: LogicalPlan,
+value: Expression,
+subquery: LogicalPlan,
+parentConds: Expression) : LogicalPlan = {
+  val (transformedPlan, subqueryConds) = 
transformAndGetConditions(value, subquery)
+  // Unify the parent query conditions and subquery conditions and add 
these as join conditions
+  val unifyConds = And(parentConds, subqueryConds)
+  Join(left, transformedPlan, LeftSemi, Some(unifyConds))
+}
+
+/**
+ * Transform the subquery LogicalPlan and add the expressions which 
are used as filters to the
+ * projection. And also return filter conditions used in subquery
+ */
+def transformAndGetConditions(value: Expression,
+  subquery: LogicalPlan): (LogicalPlan, Expression) = {
+  val expr = new scala.collection.mutable.ArrayBuffer[Expression]()
+  val transformedPlan = subquery transform {
+case project @ Project(projectList, f @ Filter(condition, child)) 
=>
--- End diff --

Yes , this type of queries cannot be evaluated in present code. May be we 
can expand it in future. I will add the TODO. And even hive also does not seems 
work this type of queries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1507][YARN]specify num of cores for AM

2015-01-04 Thread XuTingjun
Github user XuTingjun commented on the pull request:

https://github.com/apache/spark/pull/3806#issuecomment-68673436
  
Yearh, I agree with you. Later I will fix this. Thanks @sryza


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-5087. [YARN] Merge yarn.Client and yarn....

2015-01-04 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/3896#issuecomment-68673381
  
@andrewor14 @tgravescs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4226][SQL] SparkSQL - Add support for s...

2015-01-04 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/spark/pull/3249#discussion_r22448850
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -414,6 +418,123 @@ class Analyzer(catalog: Catalog,
 Generate(g, join = false, outer = false, None, child)
 }
   }
+
+  /**
+   * Transforms the query which has subquery expressions in where clause 
to join queries.
+   * Case 1 Uncorelated queries
+   * -- original query
+   * select C from R1 where R1.A in (Select B from R2)
+   * -- rewritten query
+   * Select C from R1 left semi join (select B as sqc0 from R2) subquery 
on R1.A = subquery.sqc0
+   *
+   * Case 2 Corelated queries
+   * -- original query
+   * select C from R1 where R1.A in (Select B from R2 where R1.X = R2.Y)
+   * -- rewritten query
+   * select C from R1 left semi join (select B as sqc0, R2.Y as sqc1 from 
R2) subquery
+   *   on R1.X = subquery.sqc1 and R1.A = subquery.sqc0
+   * 
+   * Refer: 
https://issues.apache.org/jira/secure/attachment/12614003/SubQuerySpec.pdf
+   */
+  object SubQueryExpressions extends Rule[LogicalPlan] {
+
+def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case p: LogicalPlan if !p.childrenResolved => p
+  case filter @ Filter(conditions, child) =>
+val subqueryExprs = new scala.collection.mutable.ArrayBuffer[In]()
+val nonSubQueryConds = new 
scala.collection.mutable.ArrayBuffer[Expression]()
+conditions.collect {
+  case s @ In(exp, Seq(SubqueryExpression(subquery))) =>
+subqueryExprs += s
+}
+val transformedConds = conditions.transform {
+  // Replace with dummy
+  case s @ In(exp,Seq(SubqueryExpression(subquery))) =>
+Literal(true)
+}
+if (subqueryExprs.size == 1) {
+  val subqueryExpr = subqueryExprs.remove(0)
+  createLeftSemiJoin(
+child,
+subqueryExpr.value,
+subqueryExpr.list(0).asInstanceOf[SubqueryExpression].subquery,
+transformedConds)
+} else if (subqueryExprs.size > 1) {
+  // Only one subquery expression is supported.
+  throw new TreeNodeException(filter, "Only one SubQuery 
expression is supported.")
+} else {
+  filter
+}
+}
+
+/**
+ * Create LeftSemi join with parent query to the subquery which is 
mentioned in 'IN' predicate
+ * And combine the subquery conditions and parent query conditions.
+ */ 
+def createLeftSemiJoin(left: LogicalPlan,
+value: Expression,
+subquery: LogicalPlan,
+parentConds: Expression) : LogicalPlan = {
+  val (transformedPlan, subqueryConds) = 
transformAndGetConditions(value, subquery)
+  // Unify the parent query conditions and subquery conditions and add 
these as join conditions
+  val unifyConds = And(parentConds, subqueryConds)
+  Join(left, transformedPlan, LeftSemi, Some(unifyConds))
+}
+
+/**
+ * Transform the subquery LogicalPlan and add the expressions which 
are used as filters to the
+ * projection. And also return filter conditions used in subquery
+ */
+def transformAndGetConditions(value: Expression,
+  subquery: LogicalPlan): (LogicalPlan, Expression) = {
+  val expr = new scala.collection.mutable.ArrayBuffer[Expression]()
+  val transformedPlan = subquery transform {
+case project @ Project(projectList, f @ Filter(condition, child)) 
=>
+  // Don't support more than one item in select list of subquery
+  if(projectList.size > 1) {
+throw new TreeNodeException(
+project,
+"SubQuery can contain only one item in Select List")
+  }
+  val resolvedChild = ResolveRelations(child)
+  // Add the expressions to the projections which are used as 
filters in subquery
+  val toBeAddedExprs = f.references.filter{a =>
+resolvedChild.resolve(a.name, resolver) != None && 
!project.outputSet.contains(a)}
+  val nameToExprMap = collection.mutable.Map[String, Alias]()
+  // Create aliases for all projection expressions.
+  val witAliases = (projectList ++ 
toBeAddedExprs).zipWithIndex.map {
+case (exp, index) => 
+  nameToExprMap.put(exp.name, Alias(exp, s"sqc$index")())
+  Alias(exp, s"sqc$index")()
+  }
+  // Replace the condition column names with alias names.
+  val transformedConds = condition.transform 

[GitHub] spark pull request: [SPARK-4226][SQL] SparkSQL - Add support for s...

2015-01-04 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/spark/pull/3249#discussion_r22448848
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubqueryExpression.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+/**
+ * Evaluates whether `subquery` result contains `value`. 
+ * For example : 'SELECT * FROM src a WHERE a.key in (SELECT b.key FROM 
src b)'
+ * @param subquery  In the above example 'SELECT b.key FROM src b' is 
'subquery'
+ */
+case class SubqueryExpression(subquery: LogicalPlan) extends Expression {
+
+  type EvaluatedType = Any
+  def dataType = subquery.output.head.dataType
+  override def foldable = false
+  def nullable = true
+  override def toString = s"SubqueryExpression($subquery)"
--- End diff --

Ok. I will change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-5087. [YARN] Merge yarn.Client and yarn....

2015-01-04 Thread sryza
GitHub user sryza opened a pull request:

https://github.com/apache/spark/pull/3896

SPARK-5087. [YARN] Merge yarn.Client and yarn.ClientBase



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sryza/spark sandy-spark-5087

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3896.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3896


commit 930df8ae21fd1efec18e26f03dcce4532bb95a24
Author: Sandy Ryza 
Date:   2015-01-05T06:30:08Z

SPARK-5087. [YARN] Merge yarn.Client and yarn.ClientBase




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4226][SQL] SparkSQL - Add support for s...

2015-01-04 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/spark/pull/3249#discussion_r22448842
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -414,6 +418,123 @@ class Analyzer(catalog: Catalog,
 Generate(g, join = false, outer = false, None, child)
 }
   }
+
+  /**
+   * Transforms the query which has subquery expressions in where clause 
to join queries.
+   * Case 1 Uncorelated queries
+   * -- original query
+   * select C from R1 where R1.A in (Select B from R2)
+   * -- rewritten query
+   * Select C from R1 left semi join (select B as sqc0 from R2) subquery 
on R1.A = subquery.sqc0
+   *
+   * Case 2 Corelated queries
+   * -- original query
+   * select C from R1 where R1.A in (Select B from R2 where R1.X = R2.Y)
+   * -- rewritten query
+   * select C from R1 left semi join (select B as sqc0, R2.Y as sqc1 from 
R2) subquery
+   *   on R1.X = subquery.sqc1 and R1.A = subquery.sqc0
+   * 
+   * Refer: 
https://issues.apache.org/jira/secure/attachment/12614003/SubQuerySpec.pdf
+   */
+  object SubQueryExpressions extends Rule[LogicalPlan] {
+
+def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case p: LogicalPlan if !p.childrenResolved => p
+  case filter @ Filter(conditions, child) =>
+val subqueryExprs = new scala.collection.mutable.ArrayBuffer[In]()
+val nonSubQueryConds = new 
scala.collection.mutable.ArrayBuffer[Expression]()
+conditions.collect {
+  case s @ In(exp, Seq(SubqueryExpression(subquery))) =>
+subqueryExprs += s
+}
+val transformedConds = conditions.transform {
+  // Replace with dummy
+  case s @ In(exp,Seq(SubqueryExpression(subquery))) =>
+Literal(true)
+}
+if (subqueryExprs.size == 1) {
+  val subqueryExpr = subqueryExprs.remove(0)
+  createLeftSemiJoin(
+child,
+subqueryExpr.value,
+subqueryExpr.list(0).asInstanceOf[SubqueryExpression].subquery,
+transformedConds)
+} else if (subqueryExprs.size > 1) {
+  // Only one subquery expression is supported.
+  throw new TreeNodeException(filter, "Only one SubQuery 
expression is supported.")
+} else {
+  filter
+}
--- End diff --

Thanks for code snippet. It is good I will add it like this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1507][YARN]specify num of cores for AM

2015-01-04 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/3806#issuecomment-68673313
  
I think the best thing would be to split spark.driver.memory into 
spark.driver.memory and spark.yarn.am.memory, and to have the latter only work 
for the yarn-client AM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5084][SQL]add if not exists after creat...

2015-01-04 Thread baishuo
GitHub user baishuo opened a pull request:

https://github.com/apache/spark/pull/3895

[SPARK-5084][SQL]add if not exists after create database-in Shim13.scala



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/baishuo/spark SPARK-5084-20150105-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3895.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3895


commit 00b08be2253c3ad6ca73f8789e0dd84856e11675
Author: baishuo 
Date:   2015-01-05T03:41:24Z

add if not exists after create database




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5084][SQL]add if not exists after creat...

2015-01-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3895#issuecomment-68673229
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4233] [SQL] WIP:Simplify the UDAF API (...

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3247#issuecomment-68672831
  
  [Test build #25047 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25047/consoleFull)
 for   PR 3247 at commit 
[`230e70f`](https://github.com/apache/spark/commit/230e70f5049fa0c0d42c878ecbbb8beb2494b2a1).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case class UnresolvedFunction(`
  * `trait AggregateFunction `
  * `trait AggregateExpression extends Expression `
  * `abstract class UnaryAggregateExpression extends UnaryExpression with 
AggregateExpression `
  * `case class Min(`
  * `case class Average(child: Expression, distinct: Boolean = false)`
  * `case class Max(child: Expression, distinct: Boolean = false)`
  * `case class Count(child: Expression)`
  * `case class CountDistinct(children: Seq[Expression])`
  * `case class Sum(child: Expression, distinct: Boolean = false)`
  * `case class First(child: Expression, distinct: Boolean = false)`
  * `case class Last(child: Expression, distinct: Boolean = false)`
  * `case class MinFunction(aggr: BoundReference, base: Min) extends 
AggregateFunction `
  * `case class AverageFunction(count: BoundReference, sum: BoundReference, 
base: Average)`
  * `case class MaxFunction(aggr: BoundReference, base: Max) extends 
AggregateFunction `
  * `case class CountFunction(aggr: BoundReference, base: Count)`
  * `case class CountDistinctFunction(aggr: BoundReference, base: 
CountDistinct)`
  * `case class SumFunction(aggr: BoundReference, base: Sum) extends 
AggregateFunction `
  * `case class FirstFunction(aggr: BoundReference, base: First) extends 
AggregateFunction `
  * `case class LastFunction(aggr: BoundReference, base: 
AggregateExpression) extends AggregateFunction `
  * `sealed case class AggregateFunctionBind(`
  * `sealed class InputBufferSeens(`
  * `sealed trait Aggregate `
  * `sealed trait PreShuffle extends Aggregate `
  * `sealed trait PostShuffle extends Aggregate `
  * `case class AggregatePreShuffle(`
  * `case class AggregatePostShuffle(`
  * `case class DistinctAggregate(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1507][YARN]specify num of cores for AM

2015-01-04 Thread XuTingjun
Github user XuTingjun commented on the pull request:

https://github.com/apache/spark/pull/3806#issuecomment-68672836
  
@sryza, do you mean "spark.driver.memory" works in yarn-client and 
yarn-cluster mode, so we should use one configuration maybe named 
"spark.driver.cores" to set am cores?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4233] [SQL] WIP:Simplify the UDAF API (...

2015-01-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3247#issuecomment-68672838
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25047/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4803] [streaming] Remove duplicate Regi...

2015-01-04 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3648#issuecomment-68672819
  
Ping, any thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5063] Useful error messages for nested ...

2015-01-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3884#discussion_r22448625
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -76,10 +76,22 @@ import org.apache.spark.util.random.{BernoulliSampler, 
PoissonSampler, Bernoulli
  * on RDD internals.
  */
 abstract class RDD[T: ClassTag](
-@transient private var sc: SparkContext,
+@transient private var _sc: SparkContext,
 @transient private var deps: Seq[Dependency[_]]
   ) extends Serializable with Logging {
 
+  if (classOf[RDD[_]].isAssignableFrom(elementClassTag.runtimeClass)) {
+throw new SparkException("Spark does not support nested RDDs (see 
SPARK-5063)")
+  }
+
+  private def sc: SparkContext = {
+if (_sc == null) {
+  throw new SparkException(
+"Can only define RDDs and perform actions on the driver, not in 
tasks (see SPARK-5063)")
--- End diff --

Looks good!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4226: Add support for subqueries in wher...

2015-01-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3888#issuecomment-68672348
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25046/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4226: Add support for subqueries in wher...

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3888#issuecomment-68672345
  
  [Test build #25046 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25046/consoleFull)
 for   PR 3888 at commit 
[`4019e0d`](https://github.com/apache/spark/commit/4019e0d6e0bf31a123f2817eb964562891211635).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case class InSubquery(value: Expression) extends Predicate `
  * `case class DynamicFilter(condition: Expression, left: LogicalPlan, 
right: LogicalPlan)`
  * `case class DynamicFilter(condition: Expression, left: SparkPlan, 
right: SparkPlan)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

2015-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3801#discussion_r22448279
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
   // failure, are re-processed or not.
   test("recovery with file input stream") {
 // Set up the streaming context and input streams
+val batchDuration = Seconds(2)  // Due to 1-second resolution of 
setLastModified() on some OS's.
 val testDir = Utils.createTempDir()
-var ssc = new StreamingContext(master, framework, Seconds(1))
-ssc.checkpoint(checkpointDir)
-val fileStream = ssc.textFileStream(testDir.toString)
-// Making value 3 take large time to process, to ensure that the master
-// shuts down in the middle of processing the 3rd batch
-val mappedStream = fileStream.map(s => {
-  val i = s.toInt
-  if (i == 3) Thread.sleep(2000)
-  i
-})
-
-// Reducing over a large window to ensure that recovery from master 
failure
-// requires reprocessing of all the files seen before the failure
-val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), 
Seconds(1))
-val outputBuffer = new ArrayBuffer[Seq[Int]]
-var outputStream = new TestOutputStream(reducedStream, outputBuffer)
-outputStream.register()
-ssc.start()
-
-// Create files and advance manual clock to process them
-// var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-Thread.sleep(1000)
-for (i <- Seq(1, 2, 3)) {
-  Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-  // wait to make sure that the file is written such that it gets 
shown in the file listings
-  Thread.sleep(1000)
+val outputBuffer = new ArrayBuffer[Seq[Int]] with 
SynchronizedBuffer[Seq[Int]]
+
+def writeFile(i: Int, clock: ManualClock): Unit = {
+  val file = new File(testDir, i.toString)
+  Files.write(i + "\n", file, Charsets.UTF_8)
+  assert(file.setLastModified(clock.currentTime()))
+  // Check that the file's modification date is actually the value we 
wrote, since rounding or
+  // truncation will break the test:
+  assert(file.lastModified() === clock.currentTime())
 }
-logInfo("Output = " + outputStream.output.mkString(","))
-assert(outputStream.output.size > 0, "No files processed before 
restart")
-ssc.stop()
 
-// Verify whether files created have been recorded correctly or not
-var fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-def recordedFiles = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
-assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
-
-// Create files while the master is down
-for (i <- Seq(4, 5, 6)) {
-  Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-  Thread.sleep(1000)
+def recordedFiles(ssc: StreamingContext): Seq[Int] = {
+  val fileInputDStream =
+ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, 
_, _]]
+  val filenames = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
+  filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
 }
 
-// Recover context from checkpoint file and verify whether the files 
that were
-// recorded before failure were saved and successfully recovered
-logInfo("*** RESTARTING ")
-ssc = new StreamingContext(checkpointDir)
-fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+try {
+  // This is a var because it's re-assigned when we restart from a 
checkpoint:
+  var clock: ManualClock = null
+  withStreamingContext(new StreamingContext(conf, batchDuration)) { 
ssc =>
+ssc.checkpoint(checkpointDir)
+clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+val waiter = new StreamingTestWaiter(ssc)
+val fileStream = ssc.textFileStream(testDir.toString)
+// MKW value 3 take a large time to process, to ensure that the 
driver
+// shuts down in the middle of processing the 3rd batch
+val mappedStream = fileStream.map(s => {
+  val i = s.toInt
+  if (i == 3) Thread.sleep

[GitHub] spark pull request: [STREAMING] SPARK-4986 Wait for receivers to d...

2015-01-04 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3868#issuecomment-68670827
  
I think I get the idea at a high-level what this patch is trying to do. The 
key change essentially is in the `ReceiverLauncher` - the `stop` is made to use 
the config `spark.streaming.gracefulStopTimeout` for tunability. However, this 
needs to be done carefully such that the behavior of the system is predictable. 

The existing code is that if the timeout is set to 10 seconds, the 
context.stop will exit after 10 seconds. So that timeout applies across all the 
different waits (wait for receiver stop, wait for job generation, wait for job 
completion). This is makes it easy for a developer to understand what is going 
to happen if he/she sets the configuration to 60 seconds. 

The current patch changes this to use the timeout at every level. The 
problem with that approach is that if the developer sets the timeout as 60 
seconds, the system may take upto 60 * 3 = 180 seconds to stop gracefully. 
That's confusing. 

So we need to wait intelligently such that max time the whole system waits 
is predictable and easy to understand. The current code already does that, but 
only partially as the receiver-stop-wait is fixed to 10 seconds and is not 
configurable. Maybe there is a better way of refactoring the code to achieve 
this, and the current patch is not the right way because of aforementioned 
reasons. 

Here are two approaches. 

1.  if processAllReceivedData = true, then `JobSchduler.stop()` calls 
`ReceiverTracker.stop()` and `JobGenerator.stop()` with timeouts which will 
depend on the how much time has passed since `JobScheduler.stop()` was called. 
The overall timeout is the configurable parameter, and `ReceiverTracker`, 
`ReceiverLauncher`, and `JobGenerator` deal without whatever stop timeout has 
been given to them. 

2. The whole sequence of `stop()`s is evaluated in a `Future` (with its own 
execution context, not the global one), and the configurable timeout is a timed 
wait on hte future. This may simplify code, but introduces another thread in 
the system. Probably okay to do so. 

I suggest we try the second approach. What do you think, @cleaton 
@JoshRosen ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5063] Useful error messages for nested ...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3884#discussion_r22447994
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -897,4 +897,23 @@ class RDDSuite extends FunSuite with 
SharedSparkContext {
   mutableDependencies += dep
 }
   }
+
+  test("Nested RDDs are not supported (SPARK-5063)") {
--- End diff --

A quick `git grep` suggests that every suite uses its own style and that 
there's not an obvious dominant style.  I'll just change these tests to the 
lowercase convention to match RDDSuite, but leave the BroadcastSuite ones as-is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5063] Useful error messages for nested ...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3884#discussion_r22447976
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -76,10 +76,22 @@ import org.apache.spark.util.random.{BernoulliSampler, 
PoissonSampler, Bernoulli
  * on RDD internals.
  */
 abstract class RDD[T: ClassTag](
-@transient private var sc: SparkContext,
+@transient private var _sc: SparkContext,
 @transient private var deps: Seq[Dependency[_]]
   ) extends Serializable with Logging {
 
+  if (classOf[RDD[_]].isAssignableFrom(elementClassTag.runtimeClass)) {
+throw new SparkException("Spark does not support nested RDDs (see 
SPARK-5063)")
+  }
+
+  private def sc: SparkContext = {
+if (_sc == null) {
+  throw new SparkException(
+"Can only define RDDs and perform actions on the driver, not in 
tasks (see SPARK-5063)")
--- End diff --

Sure.  How about this:

> RDD transformations and actions can only be invoked by the driver, not 
inside of other transformations; for example, `rdd1.map(x => 
rdd2.values.count() * x)` is invalid because the `values` transformation and 
`count` action cannot be performed inside of the `rdd1.map` transformation.  
For more information, see SPARK-5063.

Kind of verbose, but I think an example might be the clearest way to 
explain this, esp. to someone unfamiliar with the terminology.

It might be nice to keep the JIRA reference since it will make the 
exception easier to search for (I'm kind of inspired by React.js's error 
messages, which include URL-shortened links to the documentation).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4465] runAsSparkUser doesn't affect Tas...

2015-01-04 Thread jongyoul
Github user jongyoul commented on the pull request:

https://github.com/apache/spark/pull/3741#issuecomment-68669781
  
@JoshRosen please, merge this PR. thx.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4233] [SQL] WIP:Simplify the UDAF API (...

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3247#issuecomment-68669578
  
  [Test build #25047 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25047/consoleFull)
 for   PR 3247 at commit 
[`230e70f`](https://github.com/apache/spark/commit/230e70f5049fa0c0d42c878ecbbb8beb2494b2a1).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1507][YARN]specify num of cores for AM

2015-01-04 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/3806#issuecomment-68669501
  
@XuTingjun ah, that's correct.  Looking more closely, my confusion was 
stemming from some existing weirdness, which is that setting the 
"spark.driver.memory" property will set the application master memory even in 
client mode.  Also, that passing "--driver-memory" as part of the YARN code's 
ClientArguments (but not to `spark-submit`) will do this as well.

My opinion is that we should fix those, though I suppose it could be argued 
that it breaks backwards compatibility?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5074][Core] Fix a non-deterministic tes...

2015-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/3889


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5074][Core] Fix a non-deterministic tes...

2015-01-04 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/3889#issuecomment-68669402
  
Thanks - merged this in master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5074][Core] Fix a non-deterministic tes...

2015-01-04 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/3889#issuecomment-68669411
  
(Oops Patrick I didn't see your comment earlier)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4465] runAsSparkUser doesn't affect Tas...

2015-01-04 Thread jongyoul
Github user jongyoul commented on the pull request:

https://github.com/apache/spark/pull/3741#issuecomment-68669400
  
+1 @tnachen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5063] Useful error messages for nested ...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3884#discussion_r22447845
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -897,4 +897,23 @@ class RDDSuite extends FunSuite with 
SharedSparkContext {
   mutableDependencies += dep
 }
   }
+
+  test("Nested RDDs are not supported (SPARK-5063)") {
--- End diff --

It varies from suite-to-suite; most start with lowercase because they start 
with method names.  If you look at BroadcastSuite, though, most use uppercase 
style like I've done here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4465] runAsSparkUser doesn't affect Tas...

2015-01-04 Thread tnachen
Github user tnachen commented on the pull request:

https://github.com/apache/spark/pull/3741#issuecomment-68669321
  
Patch LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5062][Graphx] replace mapReduceTriplets...

2015-01-04 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/3883#issuecomment-68669274
  
cc @ankurdave can you take a look at this? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1507][YARN]specify num of cores for AM

2015-01-04 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/3806#discussion_r22447830
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 ---
@@ -70,6 +70,8 @@ private[spark] class YarnClientSchedulerBackend(
   List(
 ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
 ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
+("--driver-cores", "SPARK_MASTER_CORES", "spark.yarn.am.cores"),
--- End diff --

No need to add in a deprecated SPARK_MASTER_CORES env var.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5063] Useful error messages for nested ...

2015-01-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3884#discussion_r22447826
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -897,4 +897,23 @@ class RDDSuite extends FunSuite with 
SharedSparkContext {
   mutableDependencies += dep
 }
   }
+
+  test("Nested RDDs are not supported (SPARK-5063)") {
--- End diff --

a nit pick: i don't think we have a standard, but so far test case names 
start with lower case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4226: Add support for subqueries in wher...

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3888#issuecomment-68669185
  
  [Test build #25046 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25046/consoleFull)
 for   PR 3888 at commit 
[`4019e0d`](https://github.com/apache/spark/commit/4019e0d6e0bf31a123f2817eb964562891211635).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5063] Useful error messages for nested ...

2015-01-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/3884#discussion_r22447822
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -76,10 +76,22 @@ import org.apache.spark.util.random.{BernoulliSampler, 
PoissonSampler, Bernoulli
  * on RDD internals.
  */
 abstract class RDD[T: ClassTag](
-@transient private var sc: SparkContext,
+@transient private var _sc: SparkContext,
 @transient private var deps: Seq[Dependency[_]]
   ) extends Serializable with Logging {
 
+  if (classOf[RDD[_]].isAssignableFrom(elementClassTag.runtimeClass)) {
+throw new SparkException("Spark does not support nested RDDs (see 
SPARK-5063)")
+  }
+
+  private def sc: SparkContext = {
+if (_sc == null) {
+  throw new SparkException(
+"Can only define RDDs and perform actions on the driver, not in 
tasks (see SPARK-5063)")
--- End diff --

Pointing to a JIRA ticket might not be the most friendly way for users. 
Maybe make it more verbose and explain it in one or two lines?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1507][YARN]specify num of cores for AM

2015-01-04 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/3806#discussion_r22447819
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala ---
@@ -120,6 +121,13 @@ private[spark] class ClientArguments(args: 
Array[String], sparkConf: SparkConf)
   amMemory = value
   args = tail
 
+case ("--master-cores" | "--driver-cores") :: IntParam(value) :: 
tail =>
--- End diff --

no need to add in a deprecated "--master-cores" property


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4226: Add support for subqueries in wher...

2015-01-04 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/3888#issuecomment-68669153
  
Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5083][Core] Fix a flaky test in TaskRes...

2015-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/3894


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5074][Core] Fix a non-deterministic tes...

2015-01-04 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/3889#issuecomment-68669041
  
Thanks for looking at this, I'll pull it in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5083][Core] Fix a flaky test in TaskRes...

2015-01-04 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/3894#issuecomment-68669004
  
Thanks. Merging in master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5069][Core] Fix the race condition of T...

2015-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/3887


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5069][Core] Fix the race condition of T...

2015-01-04 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/3887#issuecomment-68668868
  
Merging in master. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5067][Core] Use '===' to compare well-d...

2015-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/3886


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5067][Core] Use '===' to compare well-d...

2015-01-04 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/3886#issuecomment-68668741
  
LGTM. Merging in master. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3801#discussion_r22447724
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
   // failure, are re-processed or not.
   test("recovery with file input stream") {
 // Set up the streaming context and input streams
+val batchDuration = Seconds(2)  // Due to 1-second resolution of 
setLastModified() on some OS's.
 val testDir = Utils.createTempDir()
-var ssc = new StreamingContext(master, framework, Seconds(1))
-ssc.checkpoint(checkpointDir)
-val fileStream = ssc.textFileStream(testDir.toString)
-// Making value 3 take large time to process, to ensure that the master
-// shuts down in the middle of processing the 3rd batch
-val mappedStream = fileStream.map(s => {
-  val i = s.toInt
-  if (i == 3) Thread.sleep(2000)
-  i
-})
-
-// Reducing over a large window to ensure that recovery from master 
failure
-// requires reprocessing of all the files seen before the failure
-val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), 
Seconds(1))
-val outputBuffer = new ArrayBuffer[Seq[Int]]
-var outputStream = new TestOutputStream(reducedStream, outputBuffer)
-outputStream.register()
-ssc.start()
-
-// Create files and advance manual clock to process them
-// var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-Thread.sleep(1000)
-for (i <- Seq(1, 2, 3)) {
-  Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-  // wait to make sure that the file is written such that it gets 
shown in the file listings
-  Thread.sleep(1000)
+val outputBuffer = new ArrayBuffer[Seq[Int]] with 
SynchronizedBuffer[Seq[Int]]
+
+def writeFile(i: Int, clock: ManualClock): Unit = {
+  val file = new File(testDir, i.toString)
+  Files.write(i + "\n", file, Charsets.UTF_8)
+  assert(file.setLastModified(clock.currentTime()))
+  // Check that the file's modification date is actually the value we 
wrote, since rounding or
+  // truncation will break the test:
+  assert(file.lastModified() === clock.currentTime())
 }
-logInfo("Output = " + outputStream.output.mkString(","))
-assert(outputStream.output.size > 0, "No files processed before 
restart")
-ssc.stop()
 
-// Verify whether files created have been recorded correctly or not
-var fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-def recordedFiles = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
-assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
-
-// Create files while the master is down
-for (i <- Seq(4, 5, 6)) {
-  Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-  Thread.sleep(1000)
+def recordedFiles(ssc: StreamingContext): Seq[Int] = {
+  val fileInputDStream =
+ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, 
_, _]]
+  val filenames = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
+  filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
 }
 
-// Recover context from checkpoint file and verify whether the files 
that were
-// recorded before failure were saved and successfully recovered
-logInfo("*** RESTARTING ")
-ssc = new StreamingContext(checkpointDir)
-fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+try {
+  // This is a var because it's re-assigned when we restart from a 
checkpoint:
+  var clock: ManualClock = null
+  withStreamingContext(new StreamingContext(conf, batchDuration)) { 
ssc =>
+ssc.checkpoint(checkpointDir)
+clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+val waiter = new StreamingTestWaiter(ssc)
+val fileStream = ssc.textFileStream(testDir.toString)
+// MKW value 3 take a large time to process, to ensure that the 
driver
+// shuts down in the middle of processing the 3rd batch
+val mappedStream = fileStream.map(s => {
+  val i = s.toInt
+  if (i == 3) Thread.

[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3801#discussion_r22447733
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---
@@ -104,6 +106,77 @@ class TestOutputStreamWithPartitions[T: 
ClassTag](parent: DStream[T],
 }
 
 /**
+ * This is an interface that can be used to block until certain events 
occur, such as
--- End diff --

I'll fix this up, along with my "two spaces after periods" style in the 
block comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3801#discussion_r22447721
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
   // failure, are re-processed or not.
   test("recovery with file input stream") {
 // Set up the streaming context and input streams
+val batchDuration = Seconds(2)  // Due to 1-second resolution of 
setLastModified() on some OS's.
 val testDir = Utils.createTempDir()
-var ssc = new StreamingContext(master, framework, Seconds(1))
-ssc.checkpoint(checkpointDir)
-val fileStream = ssc.textFileStream(testDir.toString)
-// Making value 3 take large time to process, to ensure that the master
-// shuts down in the middle of processing the 3rd batch
-val mappedStream = fileStream.map(s => {
-  val i = s.toInt
-  if (i == 3) Thread.sleep(2000)
-  i
-})
-
-// Reducing over a large window to ensure that recovery from master 
failure
-// requires reprocessing of all the files seen before the failure
-val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), 
Seconds(1))
-val outputBuffer = new ArrayBuffer[Seq[Int]]
-var outputStream = new TestOutputStream(reducedStream, outputBuffer)
-outputStream.register()
-ssc.start()
-
-// Create files and advance manual clock to process them
-// var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-Thread.sleep(1000)
-for (i <- Seq(1, 2, 3)) {
-  Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-  // wait to make sure that the file is written such that it gets 
shown in the file listings
-  Thread.sleep(1000)
+val outputBuffer = new ArrayBuffer[Seq[Int]] with 
SynchronizedBuffer[Seq[Int]]
+
+def writeFile(i: Int, clock: ManualClock): Unit = {
+  val file = new File(testDir, i.toString)
+  Files.write(i + "\n", file, Charsets.UTF_8)
+  assert(file.setLastModified(clock.currentTime()))
+  // Check that the file's modification date is actually the value we 
wrote, since rounding or
+  // truncation will break the test:
+  assert(file.lastModified() === clock.currentTime())
 }
-logInfo("Output = " + outputStream.output.mkString(","))
-assert(outputStream.output.size > 0, "No files processed before 
restart")
-ssc.stop()
 
-// Verify whether files created have been recorded correctly or not
-var fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-def recordedFiles = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
-assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
-
-// Create files while the master is down
-for (i <- Seq(4, 5, 6)) {
-  Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-  Thread.sleep(1000)
+def recordedFiles(ssc: StreamingContext): Seq[Int] = {
+  val fileInputDStream =
+ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, 
_, _]]
+  val filenames = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
+  filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
 }
 
-// Recover context from checkpoint file and verify whether the files 
that were
-// recorded before failure were saved and successfully recovered
-logInfo("*** RESTARTING ")
-ssc = new StreamingContext(checkpointDir)
-fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+try {
+  // This is a var because it's re-assigned when we restart from a 
checkpoint:
+  var clock: ManualClock = null
+  withStreamingContext(new StreamingContext(conf, batchDuration)) { 
ssc =>
+ssc.checkpoint(checkpointDir)
+clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+val waiter = new StreamingTestWaiter(ssc)
+val fileStream = ssc.textFileStream(testDir.toString)
+// MKW value 3 take a large time to process, to ensure that the 
driver
+// shuts down in the middle of processing the 3rd batch
+val mappedStream = fileStream.map(s => {
+  val i = s.toInt
+  if (i == 3) Thread.

[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3801#discussion_r22447710
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
   // failure, are re-processed or not.
   test("recovery with file input stream") {
 // Set up the streaming context and input streams
+val batchDuration = Seconds(2)  // Due to 1-second resolution of 
setLastModified() on some OS's.
 val testDir = Utils.createTempDir()
-var ssc = new StreamingContext(master, framework, Seconds(1))
-ssc.checkpoint(checkpointDir)
-val fileStream = ssc.textFileStream(testDir.toString)
-// Making value 3 take large time to process, to ensure that the master
-// shuts down in the middle of processing the 3rd batch
-val mappedStream = fileStream.map(s => {
-  val i = s.toInt
-  if (i == 3) Thread.sleep(2000)
-  i
-})
-
-// Reducing over a large window to ensure that recovery from master 
failure
-// requires reprocessing of all the files seen before the failure
-val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), 
Seconds(1))
-val outputBuffer = new ArrayBuffer[Seq[Int]]
-var outputStream = new TestOutputStream(reducedStream, outputBuffer)
-outputStream.register()
-ssc.start()
-
-// Create files and advance manual clock to process them
-// var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-Thread.sleep(1000)
-for (i <- Seq(1, 2, 3)) {
-  Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-  // wait to make sure that the file is written such that it gets 
shown in the file listings
-  Thread.sleep(1000)
+val outputBuffer = new ArrayBuffer[Seq[Int]] with 
SynchronizedBuffer[Seq[Int]]
+
+def writeFile(i: Int, clock: ManualClock): Unit = {
+  val file = new File(testDir, i.toString)
+  Files.write(i + "\n", file, Charsets.UTF_8)
+  assert(file.setLastModified(clock.currentTime()))
+  // Check that the file's modification date is actually the value we 
wrote, since rounding or
+  // truncation will break the test:
+  assert(file.lastModified() === clock.currentTime())
 }
-logInfo("Output = " + outputStream.output.mkString(","))
-assert(outputStream.output.size > 0, "No files processed before 
restart")
-ssc.stop()
 
-// Verify whether files created have been recorded correctly or not
-var fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-def recordedFiles = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
-assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
-
-// Create files while the master is down
-for (i <- Seq(4, 5, 6)) {
-  Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-  Thread.sleep(1000)
+def recordedFiles(ssc: StreamingContext): Seq[Int] = {
+  val fileInputDStream =
+ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, 
_, _]]
+  val filenames = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
+  filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
 }
 
-// Recover context from checkpoint file and verify whether the files 
that were
-// recorded before failure were saved and successfully recovered
-logInfo("*** RESTARTING ")
-ssc = new StreamingContext(checkpointDir)
-fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+try {
+  // This is a var because it's re-assigned when we restart from a 
checkpoint:
+  var clock: ManualClock = null
+  withStreamingContext(new StreamingContext(conf, batchDuration)) { 
ssc =>
+ssc.checkpoint(checkpointDir)
+clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+val waiter = new StreamingTestWaiter(ssc)
+val fileStream = ssc.textFileStream(testDir.toString)
+// MKW value 3 take a large time to process, to ensure that the 
driver
+// shuts down in the middle of processing the 3rd batch
+val mappedStream = fileStream.map(s => {
+  val i = s.toInt
+  if (i == 3) Thread.

[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3801#discussion_r22447683
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
   // failure, are re-processed or not.
   test("recovery with file input stream") {
 // Set up the streaming context and input streams
+val batchDuration = Seconds(2)  // Due to 1-second resolution of 
setLastModified() on some OS's.
 val testDir = Utils.createTempDir()
-var ssc = new StreamingContext(master, framework, Seconds(1))
-ssc.checkpoint(checkpointDir)
-val fileStream = ssc.textFileStream(testDir.toString)
-// Making value 3 take large time to process, to ensure that the master
-// shuts down in the middle of processing the 3rd batch
-val mappedStream = fileStream.map(s => {
-  val i = s.toInt
-  if (i == 3) Thread.sleep(2000)
-  i
-})
-
-// Reducing over a large window to ensure that recovery from master 
failure
-// requires reprocessing of all the files seen before the failure
-val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), 
Seconds(1))
-val outputBuffer = new ArrayBuffer[Seq[Int]]
-var outputStream = new TestOutputStream(reducedStream, outputBuffer)
-outputStream.register()
-ssc.start()
-
-// Create files and advance manual clock to process them
-// var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-Thread.sleep(1000)
-for (i <- Seq(1, 2, 3)) {
-  Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-  // wait to make sure that the file is written such that it gets 
shown in the file listings
-  Thread.sleep(1000)
+val outputBuffer = new ArrayBuffer[Seq[Int]] with 
SynchronizedBuffer[Seq[Int]]
+
+def writeFile(i: Int, clock: ManualClock): Unit = {
+  val file = new File(testDir, i.toString)
+  Files.write(i + "\n", file, Charsets.UTF_8)
+  assert(file.setLastModified(clock.currentTime()))
+  // Check that the file's modification date is actually the value we 
wrote, since rounding or
+  // truncation will break the test:
+  assert(file.lastModified() === clock.currentTime())
 }
-logInfo("Output = " + outputStream.output.mkString(","))
-assert(outputStream.output.size > 0, "No files processed before 
restart")
-ssc.stop()
 
-// Verify whether files created have been recorded correctly or not
-var fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-def recordedFiles = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
-assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
-
-// Create files while the master is down
-for (i <- Seq(4, 5, 6)) {
-  Files.write(i + "\n", new File(testDir, i.toString), 
Charset.forName("UTF-8"))
-  Thread.sleep(1000)
+def recordedFiles(ssc: StreamingContext): Seq[Int] = {
+  val fileInputDStream =
+ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, 
_, _]]
+  val filenames = 
fileInputDStream.batchTimeToSelectedFiles.values.flatten
+  filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
 }
 
-// Recover context from checkpoint file and verify whether the files 
that were
-// recorded before failure were saved and successfully recovered
-logInfo("*** RESTARTING ")
-ssc = new StreamingContext(checkpointDir)
-fileInputDStream = 
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
-assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
-assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+try {
+  // This is a var because it's re-assigned when we restart from a 
checkpoint:
+  var clock: ManualClock = null
+  withStreamingContext(new StreamingContext(conf, batchDuration)) { 
ssc =>
+ssc.checkpoint(checkpointDir)
+clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+val waiter = new StreamingTestWaiter(ssc)
+val fileStream = ssc.textFileStream(testDir.toString)
+// MKW value 3 take a large time to process, to ensure that the 
driver
+// shuts down in the middle of processing the 3rd batch
+val mappedStream = fileStream.map(s => {
+  val i = s.toInt
+  if (i == 3) Thread.

[GitHub] spark pull request: Add hadoop-2.5 profile with upgraded jets3t

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/3654#issuecomment-68668159
  
@ZhangBanger Did you end up trying out the Hadoop 2.4 profile?  Did it work 
or do we need to do further work for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Core][ SPARK-5065]Fix: BroadCast can still wo...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/3885#issuecomment-68668099
  
This is a nice fix, but it would be nice to include a test case for this.  
This is probably going to merge-conflict with my PR #3884, so I may end up 
incorporating this change into that PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5063] Useful error messages for nested ...

2015-01-04 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/3884#issuecomment-68668004
  
Haha, the `org.apache.spark.broadcast.BroadcastSuite.Using broadcast after 
destroy prints callsite` test actually broadcasts an RDD (which is invalid), 
which is what caused that test failure.  I'll fix this up in my next commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [STREAMING] SPARK-4986 Wait for receivers to d...

2015-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3868#discussion_r22447509
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/util/TimeoutUtils.scala ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+
+private[streaming] object TimeoutUtils {
+  /**
+   * Waiting function with a timeout.
+   * Returns true if done, false for timeout
+   */
+  def waitUntilDone(timeout:Long, done:() => Boolean): Boolean = {
--- End diff --

I mentioned in another comment, we can either use the `Clock` trait's 
`waitForTime`. Or if we want to implement something more generic like this (say 
`waitForCondition`), then it might better to implement in a proper class like 
`o.a.s.streaming.utils.ConditionWaiter` rather than something like 
`TimeoutUtils` which probably will have only this one method ever.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [STREAMING] SPARK-4986 Wait for receivers to d...

2015-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3868#discussion_r22447455
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 ---
@@ -93,27 +93,18 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
 
 if (processReceivedData) {
   logInfo("Stopping JobGenerator gracefully")
-  val timeWhenStopStarted = System.currentTimeMillis()
   val stopTimeout = conf.getLong(
 "spark.streaming.gracefulStopTimeout",
 10 * ssc.graph.batchDuration.milliseconds
   )
-  val pollTime = 100
-
-  // To prevent graceful stop to get stuck permanently
-  def hasTimedOut = {
-val timedOut = System.currentTimeMillis() - timeWhenStopStarted > 
stopTimeout
-if (timedOut) {
-  logWarning("Timed out while stopping the job generator (timeout 
= " + stopTimeout + ")")
-}
-timedOut
-  }
+
 
   // Wait until all the received blocks in the network input tracker 
has
   // been consumed by network input DStreams, and jobs have been 
generated with them
   logInfo("Waiting for all received blocks to be consumed for job 
generation")
-  while(!hasTimedOut && 
jobScheduler.receiverTracker.hasUnallocatedBlocks) {
-Thread.sleep(pollTime)
+  if (!TimeoutUtils.waitUntilDone(stopTimeout,
--- End diff --

Actually, instead of this utility, it might be a better idea to use the 
`clock` in this class, which has the `waitForTime` method. In fact using that 
clock allows us to check stuff in unit tests using manual clock. Might help you 
in fact in implementing a unit test for this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4835] Disable validateOutputSpecs for S...

2015-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/3832


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4835] Disable validateOutputSpecs for S...

2015-01-04 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3832#issuecomment-68666982
  
There are few nits, but this LGTM even without it. I am merging this. 
Thanks for this finding and solving this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4835] Disable validateOutputSpecs for S...

2015-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3832#discussion_r22447244
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -255,6 +255,45 @@ class CheckpointSuite extends TestSuiteBase {
 }
   }
 
+  test("recovery with saveAsHadoopFile inside transform operation") {
+// Regression test for SPARK-4835.
+//
+// In that issue, the problem was that `saveAsHadoopFile(s)` would 
fail when the last batch
+// was restarted from a checkpoint since the output directory would 
already exist.  However,
+// the other saveAsHadoopFile* tests couldn't catch this because they 
only tested whether the
+// output matched correctly and not whether the post-restart batch had 
successfully finished
+// without throwing any errors.  The following test reproduces the 
same bug with a test that
+// actually fails because the error in saveAsHadoopFile causes 
transform() to fail, which
+// prevents the expected output from being written to the output 
stream.
+//
+// This is not actually a valid use of transform, but it's being used 
here so that we can test
+// the fix for SPARK-4835 independently of additional test cleanup.
+//
+// After SPARK-5079 is addressed, should be able to remove this test 
since a strengthened
+// version of the other saveAsHadoopFile* tests would prevent 
regressions for this issue.
+val tempDir = Files.createTempDir()
+try {
+  testCheckpointedOperation(
+Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), 
Seq("", ""), Seq()),
+(s: DStream[String]) => {
+  s.transform { (rdd, time) =>
+val output = rdd.map(x => (x, 1)).reduceByKey(_ + _)
+output.saveAsHadoopFile(
+  new File(tempDir, "result-" + 
time.milliseconds).getAbsolutePath,
+  classOf[Text],
--- End diff --

No it isnt, sorry. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4835] Disable validateOutputSpecs for S...

2015-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3832#discussion_r22447232
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -255,6 +255,45 @@ class CheckpointSuite extends TestSuiteBase {
 }
   }
 
+  test("recovery with saveAsHadoopFile inside transform operation") {
+// Regression test for SPARK-4835.
+//
+// In that issue, the problem was that `saveAsHadoopFile(s)` would 
fail when the last batch
+// was restarted from a checkpoint since the output directory would 
already exist.  However,
+// the other saveAsHadoopFile* tests couldn't catch this because they 
only tested whether the
+// output matched correctly and not whether the post-restart batch had 
successfully finished
+// without throwing any errors.  The following test reproduces the 
same bug with a test that
+// actually fails because the error in saveAsHadoopFile causes 
transform() to fail, which
+// prevents the expected output from being written to the output 
stream.
+//
+// This is not actually a valid use of transform, but it's being used 
here so that we can test
+// the fix for SPARK-4835 independently of additional test cleanup.
+//
+// After SPARK-5079 is addressed, should be able to remove this test 
since a strengthened
+// version of the other saveAsHadoopFile* tests would prevent 
regressions for this issue.
+val tempDir = Files.createTempDir()
+try {
+  testCheckpointedOperation(
+Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), 
Seq("", ""), Seq()),
+(s: DStream[String]) => {
+  s.transform { (rdd, time) =>
+val output = rdd.map(x => (x, 1)).reduceByKey(_ + _)
+output.saveAsHadoopFile(
+  new File(tempDir, "result-" + 
time.milliseconds).getAbsolutePath,
+  classOf[Text],
+  classOf[IntWritable],
+  classOf[TextOutputFormat[Text, IntWritable]])
--- End diff --

Will be easier to read if the `new File` is assigned to a variable, and 
maybe `saveAsHadoopFile` is in two lines. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4835] Disable validateOutputSpecs for S...

2015-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3832#discussion_r22447211
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -255,6 +255,45 @@ class CheckpointSuite extends TestSuiteBase {
 }
   }
 
+  test("recovery with saveAsHadoopFile inside transform operation") {
+// Regression test for SPARK-4835.
+//
+// In that issue, the problem was that `saveAsHadoopFile(s)` would 
fail when the last batch
+// was restarted from a checkpoint since the output directory would 
already exist.  However,
+// the other saveAsHadoopFile* tests couldn't catch this because they 
only tested whether the
+// output matched correctly and not whether the post-restart batch had 
successfully finished
+// without throwing any errors.  The following test reproduces the 
same bug with a test that
+// actually fails because the error in saveAsHadoopFile causes 
transform() to fail, which
+// prevents the expected output from being written to the output 
stream.
+//
+// This is not actually a valid use of transform, but it's being used 
here so that we can test
+// the fix for SPARK-4835 independently of additional test cleanup.
+//
+// After SPARK-5079 is addressed, should be able to remove this test 
since a strengthened
+// version of the other saveAsHadoopFile* tests would prevent 
regressions for this issue.
+val tempDir = Files.createTempDir()
+try {
+  testCheckpointedOperation(
+Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), 
Seq("", ""), Seq()),
+(s: DStream[String]) => {
+  s.transform { (rdd, time) =>
+val output = rdd.map(x => (x, 1)).reduceByKey(_ + _)
+output.saveAsHadoopFile(
+  new File(tempDir, "result-" + 
time.milliseconds).getAbsolutePath,
+  classOf[Text],
--- End diff --

Scala style, isnt the indenting wrong here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4835] Disable validateOutputSpecs for S...

2015-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3832#discussion_r22447201
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -255,6 +255,45 @@ class CheckpointSuite extends TestSuiteBase {
 }
   }
 
+  test("recovery with saveAsHadoopFile inside transform operation") {
+// Regression test for SPARK-4835.
+//
+// In that issue, the problem was that `saveAsHadoopFile(s)` would 
fail when the last batch
+// was restarted from a checkpoint since the output directory would 
already exist.  However,
+// the other saveAsHadoopFile* tests couldn't catch this because they 
only tested whether the
+// output matched correctly and not whether the post-restart batch had 
successfully finished
+// without throwing any errors.  The following test reproduces the 
same bug with a test that
--- End diff --

nit: extra space before `The following`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1507][YARN]specify num of cores for AM

2015-01-04 Thread XuTingjun
Github user XuTingjun commented on the pull request:

https://github.com/apache/spark/pull/3806#issuecomment-6857
  
@andrewor14


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4835] Disable validateOutputSpecs for S...

2015-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3832#discussion_r22447198
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -255,6 +255,45 @@ class CheckpointSuite extends TestSuiteBase {
 }
   }
 
+  test("recovery with saveAsHadoopFile inside transform operation") {
+// Regression test for SPARK-4835.
+//
+// In that issue, the problem was that `saveAsHadoopFile(s)` would 
fail when the last batch
+// was restarted from a checkpoint since the output directory would 
already exist.  However,
--- End diff --

nit: extra space before `However`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5083][Core] Fix a flaky test in TaskRes...

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3894#issuecomment-68666590
  
  [Test build #25044 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25044/consoleFull)
 for   PR 3894 at commit 
[`d97c03d`](https://github.com/apache/spark/commit/d97c03de08e1f12f8df28812c1218ce681d2cf3a).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5083][Core] Fix a flaky test in TaskRes...

2015-01-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3894#issuecomment-68666593
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25044/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5036][Graphx]Better support sending par...

2015-01-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3866#issuecomment-68666185
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25045/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5036][Graphx]Better support sending par...

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3866#issuecomment-68666181
  
  [Test build #25045 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25045/consoleFull)
 for   PR 3866 at commit 
[`1cb8770`](https://github.com/apache/spark/commit/1cb8770aecf4634afe31762f93a295d14cfc14ab).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4465] runAsSparkUser doesn't affect Tas...

2015-01-04 Thread jongyoul
Github user jongyoul commented on the pull request:

https://github.com/apache/spark/pull/3741#issuecomment-68665839
  
I deleted my old fork from my repo and re-fork for another patch. If it 
occurs some problem for being merged into master, I can create another PR with 
same changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5003][SQL]cast support date data type

2015-01-04 Thread DoingDone9
Github user DoingDone9 commented on the pull request:

https://github.com/apache/spark/pull/3839#issuecomment-68665581
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5011][SQL] Add support for WITH SERDEPR...

2015-01-04 Thread OopsOutOfMemory
Github user OopsOutOfMemory commented on a diff in the pull request:

https://github.com/apache/spark/pull/3847#discussion_r22446919
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala ---
@@ -70,13 +73,33 @@ private[sql] class DDLParser extends 
StandardTokenParsers with PackratParsers wi
* CREATE TEMPORARY TABLE avroTable
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")
+   * OR,
+   * For other external datasources not only a kind of file like:avro, 
parquet, json, but a cluster database, like: cassandra an hbase etc...
--- End diff --

@tianyi Thanks for your comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5011][SQL] Add support for WITH SERDEPR...

2015-01-04 Thread OopsOutOfMemory
Github user OopsOutOfMemory commented on a diff in the pull request:

https://github.com/apache/spark/pull/3847#discussion_r22446914
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala ---
@@ -70,13 +73,33 @@ private[sql] class DDLParser extends 
StandardTokenParsers with PackratParsers wi
* CREATE TEMPORARY TABLE avroTable
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")
+   * OR,
+   * For other external datasources not only a kind of file like:avro, 
parquet, json, but a cluster database, like: cassandra an hbase etc...
--- End diff --

Thanks for your comments :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/3844


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964] [Streaming] Exactly-once semantic...

2015-01-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3798#discussion_r22446804
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.annotation.tailrec
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.kafka.{KafkaCluster, KafkaRDD}
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where
+  * each given Kafka topic/partition corresponds to an RDD partition.
+  * The spark configuration spark.streaming.receiver.maxRate gives the 
maximum number of messages
+  * per second that each '''partition''' will accept.
+  * Starting offsets are specified in advance,
+  * and this DStream is not responsible for committing offsets,
+  * so that you can control exactly-once semantics.
+  * For an easy interface to Kafka-managed offsets,
+  *  see {@link org.apache.spark.rdd.kafka.KafkaCluster}
+  * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration";>
+  * configuration parameters.
+  *   Requires "metadata.broker.list" or "bootstrap.servers" to be set 
with Kafka broker(s),
+  *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+  * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+  *  starting point of the stream
+  * @param messageHandler function for translating each message into the 
desired type
+  * @param maxRetries maximum number of times in a row to retry getting 
leaders' offsets
+  */
+class DeterministicKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U <: Decoder[_]: ClassTag,
+  T <: Decoder[_]: ClassTag,
+  R: ClassTag](
+@transient ssc_ : StreamingContext,
+val kafkaParams: Map[String, String],
+val fromOffsets: Map[TopicAndPartition, Long],
+messageHandler: MessageAndMetadata[K, V] => R,
+maxRetries: Int = 1
+) extends InputDStream[R](ssc_) with Logging {
+
+  private val kc = new KafkaCluster(kafkaParams)
+
+  private val maxMessagesPerPartition: Option[Long] = {
+val ratePerSec = 
ssc.sparkContext.getConf.getInt("spark.streaming.receiver.maxRate", 0)
--- End diff --

I'm not sure specifically what you mean by "window operations that require
past data which needs to be pulled from Kafka every time". The current
KafkaRDD code has a log every time compute() is called on the rdd to pull
data from kafka, and for a job with a window operation, I only see compute
called once for a given offset range, not repeatedly every time.

Regarding the bigger question of how this approach stacks up to the two
existing approaches... they're all different approaches with different
tradeoffs, I don't think one has to win.  I'd still have a use for the
original receiver based class (not the WAL one), especially if SPARK-3146
or SPARK-4960 ever get merged.

On Sat, Jan 3, 2015 at 8:57 PM, Tathagata Das 
wrote:

> In
> 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
> :
>
> > +  K: ClassTag,
> > +  V: ClassTag,
> > +  U <: Decoder[_]: ClassTag,
> > +  T <: Decoder[_]: ClassTag,
> > +  R: ClassTag](
> > +@transient ssc_ : StreamingContext,
> > +val kafkaParams: Map[String, String],
> > +val fromOffsets: Map[TopicAndPartition, Long],
> > +messageHandler: MessageAndMetadata[K, V] => R,
 

[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-04 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68665022
  
Merging this, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22446708
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,111 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
+import java.net.{URI, ServerSocket}
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Milliseconds(500)
   private val master: String = "local[2]"
-
   private val framework: String = this.getClass.getSimpleName
+  private val freePort = findFreePort()
+  private val brokerUri = "//localhost:" + freePort
+  private val topic = "def"
+  private var ssc: StreamingContext = _
+  private val persistenceDir = Utils.createTempDir()
+  private var broker: BrokerService = _
+  private var connector: TransportConnector = _
 
-  test("mqtt input stream") {
-val ssc = new StreamingContext(master, framework, batchDuration)
-val brokerUrl = "abc"
-val topic = "def"
+  before {
+ssc = new StreamingContext(master, framework, batchDuration)
+setupMQTT()
+  }
 
-// tests the API, does not actually test data receiving
-val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, 
brokerUrl, topic)
-val test2: ReceiverInputDStream[String] =
-  MQTTUtils.createStream(ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2)
+  after {
+if (ssc != null) {
+  ssc.stop()
+  ssc = null
+}
+Utils.deleteRecursively(persistenceDir)
+tearDownMQTT()
+  }
 
-// TODO: Actually test receiving data
+  test("mqtt input stream") {
+val sendMessage = "MQTT demo for spark streaming"
+val receiveStream: ReceiverInputDStream[String] =
+  MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, 
StorageLevel.MEMORY_ONLY)
+var receiveMessage: List[String] = List()
+receiveStream.foreachRDD { rdd =>
+  if (rdd.collect.length > 0) {
+receiveMessage = receiveMessage ::: List(rdd.first)
+receiveMessage
+  }
+}
+ssc.start()
+publishData(sendMessage)
+eventually(timeout(1 milliseconds), interval(100 milliseconds)) {
+  assert(sendMessage.equals(receiveMessage(0)))
+}
 ssc.stop()
   }
+
+  private def setupMQTT() {
+broker = new BrokerService()
+connector = new TransportConnector()
+connector.setName("mqtt")
+connector.setUri(new URI("mqtt:" + brokerUri))
+broker.addConnector(connector)
+broker.start()
+  }
+
+  private def tearDownMQTT() {
+if (broker != null) {
+  broker.stop()
+  broker = null
+}
+if (connector != null) {
+  connector.stop()
+  connector = null
+}
+  }
+
+  private def findFreePort(): Int = {
+Utils.startServiceOnPort(23456, (trialPort: Int) => {
+  val socket = new ServerSocket(trialPort)
+  socket.close()
+  (null, trialPort)
+})._2
+  }
+
+  def publishData(data: String): Unit = {
+var client: MqttClient = null
+try {
+  val persistence: MqttClientPersistence = new 
MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+  client = new MqttClient("tcp:" + brokerUri, 
MqttClient.generateClientId(), persistence)
+  client.connect()
+  if (client.isConnected) {
+val msgTopic: MqttTopic = client.getTopic(topic)
+val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
+message.setQos(1)
+message.setRetained(true)
+for (i <- 0 to 100)
+  msgTopic.publish(message)
--- End diff --

nit: I missed this in the last pass, but this violates the Scala syntax 
that we follow. I wont block this PR for this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as we

[GitHub] spark pull request: [SPARK-4229] Create hadoop configuration in a ...

2015-01-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3543#discussion_r22446683
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
 ---
@@ -789,7 +790,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, 
V)])(
   keyClass: Class[_],
   valueClass: Class[_],
   outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
-  conf: Configuration = new Configuration) {
--- End diff --

Based on what Marcelo Vanzin said on the dev list when I brought this issue
up, the only reason the problem was still around for me to run into is
because he changed some of the uses of new Configuration but not all of
them.

I agree it's used in a lot of different places, but I'm not sure how
piecemeal fixes to only some of the places is helpful to users. Were there
still specific concerns about particular classes?

On Sun, Jan 4, 2015 at 6:28 AM, Tathagata Das 
wrote:

> In
> 
streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
> :
>
> > @@ -789,7 +790,7 @@ class JavaPairDStream[K, V](val dstream: 
DStream[(K, V)])(
> >keyClass: Class[_],
> >valueClass: Class[_],
> >outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
> > -  conf: Configuration = new Configuration) {
>
> The scope of this PR is pretty wide in terms of the number of classes it
> touches, causing issues as different places needs to be handled
> differently. If you considered moving this sort of changes (new
> Configuration to sparkContext.hadoopConfiguration) into a different PR
> that might be easier to get in.
>
> —
> Reply to this email directly or view it on GitHub
> .
>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5062][Graphx] replace mapReduceTriplets...

2015-01-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3883#issuecomment-68664502
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25041/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5062][Graphx] replace mapReduceTriplets...

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3883#issuecomment-68664498
  
  [Test build #25041 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25041/consoleFull)
 for   PR 3883 at commit 
[`bd8c438`](https://github.com/apache/spark/commit/bd8c438517dfe67126b0d6738c1954a2e75351ff).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4233] [SQL] WIP:Simplify the UDAF API (...

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3247#issuecomment-68663862
  
  [Test build #25043 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25043/consoleFull)
 for   PR 3247 at commit 
[`74945d5`](https://github.com/apache/spark/commit/74945d50cb7fcc837afaf920ea559613a373356a).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case class UnresolvedFunction(`
  * `trait AggregateFunction `
  * `trait AggregateExpression extends Expression `
  * `abstract class UnaryAggregateExpression extends UnaryExpression with 
AggregateExpression `
  * `case class Min(`
  * `case class Average(child: Expression, distinct: Boolean = false)`
  * `case class Max(child: Expression, distinct: Boolean = false)`
  * `case class Count(child: Expression)`
  * `case class CountDistinct(children: Seq[Expression])`
  * `case class Sum(child: Expression, distinct: Boolean = false)`
  * `case class First(child: Expression, distinct: Boolean = false)`
  * `case class Last(child: Expression, distinct: Boolean = false)`
  * `case class MinFunction(aggr: BoundReference, base: Min) extends 
AggregateFunction `
  * `case class AverageFunction(count: BoundReference, sum: BoundReference, 
base: Average)`
  * `case class MaxFunction(aggr: BoundReference, base: Max) extends 
AggregateFunction `
  * `case class CountFunction(aggr: BoundReference, base: Count)`
  * `case class CountDistinctFunction(aggr: BoundReference, base: 
CountDistinct)`
  * `case class SumFunction(aggr: BoundReference, base: Sum) extends 
AggregateFunction `
  * `case class FirstFunction(aggr: BoundReference, base: First) extends 
AggregateFunction `
  * `case class LastFunction(aggr: BoundReference, base: 
AggregateExpression) extends AggregateFunction `
  * `sealed case class AggregateFunctionBind(`
  * `sealed class KeyBufferSeens(`
  * `sealed trait Aggregate `
  * `sealed trait PreShuffle extends Aggregate `
  * `sealed trait PostShuffle extends Aggregate `
  * `case class AggregatePreShuffle(`
  * `case class AggregatePostShuffle(`
  * `case class DistinctAggregate(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   >