[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21497
  
cc. @tdas @jose-torres @jerryshao @arunmahadevan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
Also added custom metric for the count of versions stored in loadedMaps.

This is a new screenshot:
https://user-images.githubusercontent.com/1317309/40978481-b46ad324-690e-11e8-9b0f-e80528612a62.png;>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21497
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21497
  
Failing tests were below:
* org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is 
a sbt.testing.NestedSuiteSelector)
* org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is 
a sbt.testing.NestedSuiteSelector)

Test failures are not relevant to the patch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...

2018-06-05 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21497

[SPARK-24466][SS] Fix TextSocketMicroBatchReader to be compatible with 
netcat again

## What changes were proposed in this pull request?

TextSocketMicroBatchReader was no longer be compatible with netcat due to 
launching temporary reader for reading schema, and closing reader, and 
re-opening reader. While reliable socket server should be able to handle this 
without any issue, nc command normally can't handle multiple connections and 
simply exits when closing temporary reader.

This patch fixes TextSocketMicroBatchReader to be compatible with netcat 
again, via deferring opening socket to the first call of planInputPartitions() 
instead of constructor.

## How was this patch tested?

Added unit test which fails on current and succeeds with the patch. And 
also manually tested.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24466

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21497.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21497


commit 7b875279742fa295ab513cf8a489830237953d0c
Author: Jungtaek Lim 
Date:   2018-06-05T07:57:42Z

SPARK-24466 Fix TextSocketMicroBatchReader to be compatible with netcat 
again




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
Looks like the size is added only once for same identity on 
SizeEstimator.estimate(), so SizeEstimator.estimate() is working correctly in 
this case. There might be other valid cases, but not sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21477
  
Thanks @HyukjinKwon for cc.ing me. I didn't cover the python part on 
structured streaming so would take some time to cover and going through the 
code. Hoping I can participate reviewing in time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-04 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
@arunmahadevan 
I didn't add the metric to StateOperatorProgress cause this behavior is 
specific to HDFSBackedStateStoreProvider (though this is only one 
implementation available in Apache Spark) so not sure this metric can be 
treated as a general one. (@tdas what do you think about this?)

Btw, the cache is going to clean up when maintenance operation is in 
progress, so there could be more than 100 versions in map. Not sure why it 
shows 150x, but I couldn't find missing spot on the patch. Maybe the issue is 
from SizeEstimator.estimate()?

One thing we need to check is how SizeEstimator.estimate() calculate the 
memory usage when Unsafe row objects are shared across versions. If 
SizeEstimator adds the size of object whenever it is referenced, it will report 
much higher memory usage than actual.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total size of states in HDFSBac...

2018-06-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
@jose-torres 
Ah yes I forgot that shallow copy has been occurring, so while new map 
should hold necessary size of map entries but row object will be shared across 
versions. Thanks for pointing it out. Will update the description.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total size of states in HDFSBac...

2018-06-01 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
Thanks @HyukjinKwon for reviewing. Addressed PR title as well as fixing nit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total size of states in HDFSBac...

2018-05-31 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
cc. @tdas @jose-torres @jerryshao @HyukjinKwon @arunmahadevan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total size of states in ...

2018-05-31 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21469

[SPARK-24441][SS] Expose total size of states in HDFSBackedStateStore…

…Provider

## What changes were proposed in this pull request?

This patch exposes the estimation of size of cache (loadedMaps) in 
HDFSBackedStateStoreProvider as a custom metric of StateStore. While it refers 
loadedMaps directly, there would be only one StateStoreWriter which refers a 
StateStoreProvider, so the value is not exposed as well as being aggregated 
multiple times. Current state metrics are safe to aggregate for the same reason.

## How was this patch tested?

Tested manually. Below is the snapshot of UI page which is reflected by the 
patch: 

https://user-images.githubusercontent.com/1317309/40788976-4ad93d8c-652c-11e8-88f1-337be5162588.png;>


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24441

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21469.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21469


commit dc11338650842a246a4bce9280d130607ceca281
Author: Jungtaek Lim 
Date:   2018-05-31T14:38:00Z

[SPARK-24441][SS] Expose total size of states in 
HDFSBackedStateStoreProvider

* expose estimation of size of cache (loadMaps) in 
HDFSBackedStateStoreProvider
  * as a custom metric of StateStore




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191629554
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala
 ---
@@ -58,39 +46,29 @@ class ContinuousShuffleReadSuite extends StreamTest {
 super.afterEach()
   }
 
-  test("receiver stopped with row last") {
-val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
-val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
-send(
-  endpoint,
-  ReceiverEpochMarker(0),
-  ReceiverRow(0, unsafeRow(111))
-)
+  private implicit def unsafeRow(value: Int) = {
--- End diff --

And where it leverages the `implicit` attribute of this method? I'm not 
sure it is really needed, but I'm review on Github page so I might be missing 
here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191605388
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala
 ---
@@ -58,39 +46,29 @@ class ContinuousShuffleReadSuite extends StreamTest {
 super.afterEach()
   }
 
-  test("receiver stopped with row last") {
-val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
-val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
-send(
-  endpoint,
-  ReceiverEpochMarker(0),
-  ReceiverRow(0, unsafeRow(111))
-)
+  private implicit def unsafeRow(value: Int) = {
--- End diff --

Just curious: is there a reason to rearrange functions, this and below 
twos? Looks like they're same except changing this function to `implicit`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191629272
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala
 ---
@@ -288,4 +267,153 @@ class ContinuousShuffleReadSuite extends StreamTest {
 val thirdEpoch = rdd.compute(rdd.partitions(0), 
ctx).map(_.getUTF8String(0).toString).toSet
 assert(thirdEpoch == Set("writer1-row1", "writer2-row0"))
   }
+
+  test("one epoch") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+writer.write(Iterator(1, 2, 3))
+
+assert(readEpoch(reader) == Seq(1, 2, 3))
+  }
+
+  test("multiple epochs") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+writer.write(Iterator(1, 2, 3))
+writer.write(Iterator(4, 5, 6))
+
+assert(readEpoch(reader) == Seq(1, 2, 3))
+assert(readEpoch(reader) == Seq(4, 5, 6))
+  }
+
+  test("empty epochs") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+writer.write(Iterator())
+writer.write(Iterator(1, 2))
+writer.write(Iterator())
+writer.write(Iterator())
+writer.write(Iterator(3, 4))
+writer.write(Iterator())
+
+assert(readEpoch(reader) == Seq())
+assert(readEpoch(reader) == Seq(1, 2))
+assert(readEpoch(reader) == Seq())
+assert(readEpoch(reader) == Seq())
+assert(readEpoch(reader) == Seq(3, 4))
+assert(readEpoch(reader) == Seq())
+  }
+
+  test("blocks waiting for writer") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+val readerEpoch = reader.compute(reader.partitions(0), ctx)
+
+val readRowThread = new Thread {
+  override def run(): Unit = {
+assert(readerEpoch.toSeq.map(_.getInt(0)) == Seq(1))
+  }
+}
+readRowThread.start()
+
+eventually(timeout(streamingTimeout)) {
+  assert(readRowThread.getState == Thread.State.TIMED_WAITING)
+}
+
+// Once we write the epoch the thread should stop waiting and succeed.
+writer.write(Iterator(1))
+readRowThread.join()
+  }
+
+  test("multiple writer partitions") {
--- End diff --

Would we want to have another test which covers out-of-order epoch between 
writers (if that's valid case for us), or rely on the test in 
ContinuousShuffleReadRDD?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21445: [SPARK-24404][SS] Increase currentEpoch when meet a Epoc...

2018-05-29 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21445
  
@LiangchangZ 

> In the real CP situation, reader and writer may be always in different 
tasks, right?

Continuous mode already supports some valid use cases, and putting all in 
one task would be fastest in such use cases though tasks can be parallelized by 
partition. Unless we have valid reason to separate reader and writer even in 
non-shuffle query, it would be better to keep it as it is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21445: [SPARK-24404][SS] Increase currentEpoch when meet a Epoc...

2018-05-28 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21445
  
@LiangchangZ 
Looks like the patch is needed only with #21353 #21332 #21293 as of now, 
right? If then please state the condition in JIRA issue description as well as 
PR's description so that we don't get confused.

There's a case that reader and writer are composed together in a task 
(current state of continuous processing), and then after this patch it will be 
two places which increase epoch for the same thread. Please note that I'm 
commenting on top of current implementation, not considering #21353 #21332 
#21293.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190131693
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived =
+mutable.Map.empty[Int, Boolean].withDefaultValue(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
--- End diff --

And I'm also now seeing this approach as alternative to deal with alignment 
(not buffer rows explicitly but just don't read after epoch comes in). Nice 
approach.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190129892
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived =
+mutable.Map.empty[Int, Boolean].withDefaultValue(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
+case null =>
+  // Try again if the poll didn't wait long enough to get a 
real result.
+  // But we should be getting at least an epoch marker every 
checkpoint interval.
+  logWarning(
+s"Completion service failed to make progress after 
$checkpointIntervalMs ms")
+  null
+
+// The completion service guarantees this future will be 
available immediately.
+case future => future.get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just 
took from.
+completion.submit(completionTask(writerId))
+r
+  // TODO use writerId
--- End diff --

It looks like to be not needed as of now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190125731
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived =
+mutable.Map.empty[Int, Boolean].withDefaultValue(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
--- End diff --

As I commented earlier in design doc that I was in favor of single queue, 
because I thought it minimizes the thread count which may avoid unnecessary 
contention (as well as code complexity in this case), and also defines the 
condition of backpressure fairly simple (if RPC requests can block infinitely 
unless queue has room to write). 

But as I read some articles regarding `multiple writers, single reader on 
single queue` vs `single writer, single reader on multiple queues per writer`, 
unless we introduce highly-optimized queue like Disruptor, second approach 
looks like perform better. 

So the approach looks great to me for now, and at least we could consider 
replacing this with adopting queue library when we encounter the bad situation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190120836
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived =
--- End diff --

The map will always contain `(writerId, true)` which value is not needed at 
all, and we are only concerned about the writerId which range is 0 until 
numShuffleWriters, so it might be better to consider alternative as well.

Looks like this could be also a Set with pre-initialized to 0 until 
numShuffleWriters, and we can remove the element when we receive mark. If the 
element is still in a set, this represents we didn't receive mark from such 
writer yet.

In similar approach, it can be pre-initialized Array of Boolean with value 
as true/false.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...

2018-05-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21388
  
@hvanhovell 
I also think someone might not want to have reflection magic (I was the one 
but realized I should do it), so I'm happy to close the PR when others voice 
same opinion on this too.

For me, reflection looks like only way to achieve `Can we automate these 
'pass through' operations?`, so if we decide to reject the approach, we might 
be better to either remove the line, or add description on restriction(s) 
instead, unless we have another immediate idea to achieve it without reflection.

Btw, I'd be very happy if you are happy to spend some time to explain which 
points make you being concerned about reflection in planner. Maybe adding the 
description explicitly would avoid the similar trial on contributors and save 
our time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...

2018-05-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21388
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...

2018-05-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21388
  
Thanks @HyukjinKwon for reviewing. Addressed review comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21388: [SPARK-24336][SQL] Support 'pass through' transfo...

2018-05-21 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21388

[SPARK-24336][SQL] Support 'pass through' transformation in BasicOperators

## What changes were proposed in this pull request?

Enable 'pass through' transformation in BasicOperators via reflection, so 
that every pairs of transformation which only requires converting LogicalPlan 
to SparkPlan via calling `planLater()` can be  transformed automatically. It 
just needs to add the pair of transformation in map.

## How was this patch tested?

Unit tests on existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24336

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21388.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21388


commit 4c2f700a5d7944c13681581df7379e9653c5d588
Author: Jungtaek Lim <kabhwan@...>
Date:   2018-05-21T23:31:26Z

SPARK-24336 Support 'pass through' transformation in BasicOperators




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...

2018-05-17 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21357
  
cc. @tdas 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStorePr...

2018-05-17 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21357

[SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider to remove 
duplicate…

…d logic between operations on delta file and snapshot file

## What changes were proposed in this pull request?

This patch refactors HDFSBackedStateStoreProvider to extract duplicated 
logic between operations on delta file and snapshot file, as well as 
documenting the structure of state file.

## How was this patch tested?

Existing unit tests: StateStoreSuite

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24311

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21357.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21357


commit 8ad2a3f8112662a865ee1dbaf7c5269197c3ee4f
Author: Jungtaek Lim <kabhwan@...>
Date:   2018-05-17T21:17:30Z

SPARK-24311 Refactor HDFSBackedStateStoreProvider to remove duplicated 
logic between operations on delta file and snapshot file

* also removed unused import statements




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188638980
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.{TaskContext, TaskContextImpl}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{DataType, IntegerType}
+
+class ContinuousShuffleReadSuite extends StreamTest {
+
+  private def unsafeRow(value: Int) = {
+UnsafeProjection.create(Array(IntegerType : DataType))(
+  new GenericInternalRow(Array(value: Any)))
+  }
+
+  var ctx: TaskContextImpl = _
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+ctx = TaskContext.empty()
+TaskContext.setTaskContext(ctx)
+  }
+
+  override def afterEach(): Unit = {
+ctx.markTaskCompleted(None)
+ctx = null
+super.afterEach()
+  }
+
+  test("receiver stopped with row last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.stopped.get())
+}
+  }
+
+  test("receiver stopped with marker last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.stopped.get())
+}
+  }
+
+  test("one epoch") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val iter = rdd.compute(rdd.partitions(0), ctx)
+assert(iter.next().getInt(0) == 111)
+assert(iter.next().getInt(0) == 222)
+assert(iter.next().getInt(0) == 333)
+assert(!iter.hasNext)
+  }
+
+  test("multiple epochs") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(firstEpoch.next().getInt(0) == 111)
+assert(!firstEpoch.hasNext)
+
+val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(secondEpoch.next().getInt(0) == 222)
+assert(secondEpoch.next().getInt(0) == 333)
+assert(!secondEpoch.hasNext)
+  }
+
+  test("empty epochs

[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188632188
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
+TaskContext.get().addTaskCompletionListener { ctx =>
+  env.stop(endpoint)
+}
+(receiver, endpoint)
+  }
+}
+
+/**
+ * RDD at the bottom of each continuous processing shuffle task, reading 
from the
+ */
+class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int)
+extends RDD[UnsafeRow](sc, Nil) {
+
+  override protected def getPartitions: Array[Partition] = {
+(0 until numPartitions).map(ContinuousShuffleReadPartition).toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val receiver = 
split.asInstanceOf[ContinuousShuffleReadPartition].receiver
+
+new NextIterator[UnsafeRow] {
+  override def getNext(): UnsafeRow = receiver.poll() match {
+case ReceiverRow(r) => r
+case ReceiverEpochMarker() =>
--- End diff --

Does this ensure at-least-once? Then we could start from this, and improve 
it from another PR as @jose-torres stated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188628202
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
+TaskContext.get().addTaskCompletionListener { ctx =>
+  env.stop(endpoint)
+}
+(receiver, endpoint)
+  }
+}
+
+/**
+ * RDD at the bottom of each continuous processing shuffle task, reading 
from the
--- End diff --

If my understanding is right, bottom will be the RDD which will be just 
injected before shuffling, so that would be neither reader nor writer.

`first` and `last` would be good alternative for me if bottom looks like 
ambiguous. 

As @arunmahadevan stated, comment looks like incomplete.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188636306
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or 
an epoch marker.
+ */
+private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
+private[shuffle] case class ReceiverRow(row: UnsafeRow) extends 
UnsafeRowReceiverMessage
+private[shuffle] case class ReceiverEpochMarker() extends 
UnsafeRowReceiverMessage
+
+/**
+ * RPC endpoint for receiving rows into a continuous processing shuffle 
task.
+ */
+private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv)
+extends ThreadSafeRpcEndpoint with Logging {
+  private val queue = new 
ArrayBlockingQueue[UnsafeRowReceiverMessage](1024)
--- End diff --

I guess we can handle 2 as TODO if we would like to focus on proposed patch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-05-14 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21222
  
Kindly ping. I guess debugging last batch might not be attractive that 
much, but printing codegen would be helpful to someone who want to investigate 
or debug in detail.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...

2018-05-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21222#discussion_r186247474
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---
@@ -116,6 +168,30 @@ package object debug {
 }
   }
 
+  implicit class DebugStreamQuery(query: StreamingQuery) extends Logging {
+def debug(): Unit = {
+  query match {
+case w: StreamExecution =>
--- End diff --

My bad. Fixed. Also changed the unit tests so that your reported case can 
be covered in test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...

2018-05-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21222#discussion_r186032252
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---
@@ -88,23 +100,62 @@ package object debug {
 }
   }
 
+  /**
+   * Get WholeStageCodegenExec subtrees and the codegen in a query plan 
into one String
+   *
+   * @param query the streaming query for codegen
+   * @return single String containing all WholeStageCodegen subtrees and 
corresponding codegen
+   */
+  def codegenString(query: StreamingQuery): String = {
+val msg = query match {
+  case w: StreamExecution if w.lastExecution != null =>
--- End diff --

Addressed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21207: SPARK-24136: Fix MemoryStreamDataReader.next to skip sle...

2018-05-03 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21207
  
@tdas @jerryshao @HyukjinKwon 
Kindly ping to trigger test and review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-05-03 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21222
  
@tdas @jose-torres @jerryshao @arunmahadevan
Kindly ping to review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...

2018-05-02 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21222

[SPARK-24161][SS] Enable debug package feature on structured streaming

## What changes were proposed in this pull request?

Currently, debug package has a implicit class "DebugQuery" which matches 
Dataset to provide debug features on Dataset class. It doesn't work with 
structured streaming: it requires query is already started, and the information 
can be retrieved from StreamingQuery, not Dataset. I guess that's why "explain" 
had to be placed to StreamingQuery whereas it already exists on Dataset.

This patch adds a new implicit class "DebugStreamQuery" which matches 
StreamingQuery to provide similar debug features on StreamingQuery class.

## How was this patch tested?

Added relevant unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24161

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21222.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21222


commit c1ad1c557e6165455457adb6f148d6d9616548a1
Author: Jungtaek Lim <kabhwan@...>
Date:   2018-05-03T02:26:48Z

SPARK-24161 Enable debug package feature on structured streaming

* added implicit class which adds debug features for StreamingQuery
* added unit tests for new functionalities




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r18520
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
+  }
+  if (readerForPartition.dataReaderFailed.get()) {
+throw new SparkException(
+  "data read failed", 
readerForPartition.dataReaderThread.failureReason)
+  }
+  if (readerForPartition.epochPollFailed.get()) {
+throw new SparkException(
+  "epoch poll failed", 
readerForPartition.epochPollRunnable.failur

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185328820
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
+  }
+  if (readerForPartition.dataReaderFailed.get()) {
+throw new SparkException(
+  "data read failed", 
readerForPartition.dataReaderThread.failureReason)
+  }
+  if (readerForPartition.epochPollFailed.get()) {
+throw new SparkException(
+  "epoch poll failed", 
readerForPartition.epochPollRunnable.failur

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185326551
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
+  }
+  if (readerForPartition.dataReaderFailed.get()) {
+throw new SparkException(
+  "data read failed", 
readerForPartition.dataReaderThread.failureReason)
+  }
+  if (readerForPartition.epochPollFailed.get()) {
+throw new SparkException(
+  "epoch poll failed", 
readerForPartition.epochPollRunnable.failur

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185317000
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
--- End diff --

Yeah, that's what I also missed. Thanks for correcting. :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185316062
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
+  }
+  if (readerForPartition.dataReaderFailed.get()) {
+throw new SparkException(
+  "data read failed", 
readerForPartition.dataReaderThread.failureReason)
+  }
+  if (readerForPartition.epochPollFailed.get()) {
+throw new SparkException(
+  "epoch poll failed", 
readerForPartition.epochPollRunnable.failur

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185282844
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
--- End diff --

I meant current logic still call queue.poll again instead of using assigned 
epoch marker value, even if it matches the if statement. It looks like 
unintended, right?
We can arrange the logic to fail-fast on exception cases, and if-else to 
fix that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185201032
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
 ---
@@ -46,28 +46,34 @@ case class WriteToContinuousDataSourceExec(writer: 
StreamWriter, query: SparkPla
   case _ => new 
InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
 }
 
-val rdd = query.execute()
+val rdd = new ContinuousWriteRDD(query.execute(), writerFactory)
+val messages = new Array[WriterCommitMessage](rdd.partitions.length)
 
 logInfo(s"Start processing data source writer: $writer. " +
-  s"The input RDD has ${rdd.getNumPartitions} partitions.")
-// Let the epoch coordinator know how many partitions the write RDD 
has.
+  s"The input RDD has ${messages.length} partitions.")
 EpochCoordinatorRef.get(
-
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
-sparkContext.env)
+  
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+  sparkContext.env)
   .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
 
 try {
   // Force the RDD to run so continuous processing starts; no data is 
actually being collected
   // to the driver, as ContinuousWriteRDD outputs nothing.
-  sparkContext.runJob(
-rdd,
-(context: TaskContext, iter: Iterator[InternalRow]) =>
-  WriteToContinuousDataSourceExec.run(writerFactory, context, 
iter),
-rdd.partitions.indices)
+  rdd.collect()
 } catch {
   case _: InterruptedException =>
-// Interruption is how continuous queries are ended, so accept and 
ignore the exception.
+  // Interruption is how continuous queries are ended, so accept and 
ignore the exception.
   case cause: Throwable =>
+logError(s"Data source writer $writer is aborting.")
--- End diff --

Could you please explain the needs of additional handling? Since 
ContinuousWriteRDD is still handling the error case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r18524
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochPollRunnable.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.BlockingQueue
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
+
+/**
+ * The epoch marker component of [[ContinuousQueuedDataReader]]. Populates 
the queue with
+ * (null, null) when a new epoch marker arrives.
+ */
+class EpochPollRunnable(
+queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
+context: TaskContext,
+failedFlag: AtomicBoolean)
+  extends Thread with Logging {
+  private[continuous] var failureReason: Throwable = _
+
+  private val epochEndpoint = EpochCoordinatorRef.get(
+
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), 
SparkEnv.get)
+  // Note that this is *not* the same as the currentEpoch in 
[[ContinuousDataQueuedReader]]! That
+  // field represents the epoch wrt the data being processed. The 
currentEpoch here is just a
+  // counter to ensure we send the appropriate number of markers if we 
fall behind the driver.
+  private var currentEpoch = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  override def run(): Unit = {
+try {
+  val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch)
+  for (i <- currentEpoch to newEpoch - 1) {
--- End diff --

Please correct me if I'm missing. My understanding is that the situation 
(gap bigger than 1) should only occur when array queue gets full and blocks 
epoch thread to put marker more than trigger interval. Any other situations 
(error cases) should just crash the whole query so that recovery happens from 
the scratch: that's why we can ignore the missing case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185194384
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
+  }
+  if (readerForPartition.dataReaderFailed.get()) {
+throw new SparkException(
+  "data read failed", 
readerForPartition.dataReaderThread.failureReason)
+  }
+  if (readerForPartition.epochPollFailed.get()) {
+throw new SparkException(
+  "epoch poll failed", 
readerForPartition.epochPollRunnable.failur

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185198458
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.Closeable
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A wrapper for a continuous processing data reader, including a reading 
queue and epoch markers.
+ *
+ * This will be instantiated once per partition - successive calls to 
compute() in the
+ * [[ContinuousDataSourceRDD]] will reuse the same reader. This is 
required to get continuity of
+ * offsets across epochs.
+ *
+ * For performance reasons, this is very weakly encapsulated. There are 
six handles for the RDD:
+ *  * currentOffset - contains the offset of the most recent row which a 
compute() iterator has sent
+ *upwards. The RDD is responsible for advancing this.
+ *  * currentEpoch - the epoch which is currently occurring. The RDD is 
responsible for incrementing
+ *this before ending the compute() iterator.
+ *  * queue - the queue of incoming rows (row, offset) or epoch markers 
(null, null). The
+ *ContinuousQueuedDataReader writes into this queue, and RDD.compute() 
will read from it.
+ *  * {epochPoll|dataReader}Failed - flags to check if the epoch poll and 
data reader threads are
+ *still running. These threads won't be restarted if they fail, so the 
RDD should intercept
+ *this state when convenient to fail the query.
+ *  * close() - to close this reader when the query is going to shut down.
+ */
+class ContinuousQueuedDataReader(
+split: Partition,
+context: TaskContext,
+dataQueueSize: Int,
+epochPollIntervalMs: Long) extends Closeable {
+  private val reader = 
split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]]
+.readerFactory.createDataReader()
+
+  // Important sequencing - we must get our starting point before the 
provider threads start running
+  var currentOffset: PartitionOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+  var currentEpoch: Long = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  // This queue contains two types of messages:
+  // * (null, null) representing an epoch boundary.
+  // * (row, off) containing a data row and its corresponding 
PartitionOffset.
+  val queue = new ArrayBlockingQueue[(UnsafeRow, 
PartitionOffset)](dataQueueSize)
+
+  val epochPollFailed = new AtomicBoolean(false)
+  val dataReaderFailed = new AtomicBoolean(false)
+
+  private val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+
+  private val epochPollExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+s"epoch-poll--$coordinatorId--${context.partitionId()}")
+  val epochPollRunnable = new EpochPollRunnable(queue, context, 
epochPollFailed)
+  epochPollExecutor.scheduleWithFixedDelay(
+epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+  val dataReaderThread = new DataReaderThread(reader, queue, context, 
dataReaderFailed)
+  dataReaderThread.setDaemon(true)
+  dataReaderThread.start()
+
+  context.addTaskCompletionListener(_ => {
--- End diff --

Maybe better to just call `close` if `this` is visible.


---

---

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185187424
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
--- End diff --

This line is effectively no-op unless we exit the loop afterwards. So 
better to clarify the behavior and fix it.
I know this code block is just same as of now so it might be out of topic. 
If we would like to address it from other issue, I'm happy to file an issue and 
also work on this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.or

[GitHub] spark issue #21152: [SPARK-23688][SS] Refactor tests away from rate source

2018-04-27 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21152
  
@jerryshao Thanks for merging! My Apache JIRA ID is “kabhwan”


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21152: [SPARK-23688][SS] Refactor tests away from rate source

2018-04-26 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21152
  
Thanks fore reviewing. I have addressed review comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184322699
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
   }
 
   test("map") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .map(r => r.getLong(0) * 2)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().map(_.getInt(0) * 2)
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).map(_ * 2): _*),
--- End diff --

Got it. Looks like we could reduce the range and list out literals. Will 
update.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184321762
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
--- End diff --

Thanks for the pointer. I started from `(0 to 5)` but spark Scala style 
guide mentions avoiding infix notation so a bit puzzled (I was not sure `to` is 
an operator). Will update.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21063: [SPARK-23886][Structured Streaming] Update query ...

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21063#discussion_r184241625
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -111,7 +112,12 @@ trait ProgressReporter extends Logging {
 logDebug("Starting Trigger Calculation")
 lastTriggerStartTimestamp = currentTriggerStartTimestamp
 currentTriggerStartTimestamp = triggerClock.getTimeMillis()
-currentStatus = currentStatus.copy(isTriggerActive = true)
+// isTriggerActive field is kept false for ContinuousExecution
+// since it is tied to MicroBatchExecution
+this match {
--- End diff --

nit: someone may have a concern that a trait needs to be aware of actual 
implementation. 

There looks like two options: 
1. extract method to only update currentStatus for starting trigger 
defaulting to `isTriggerActive = true`, and let `ContinuousExecution` overrides 
the method. 
2. just override `startTrigger()` in `ContinuousExecution`, and call 
`super.startTrigger()` and update currentStatus once again. It might open very 
small window for other threads to read invalid status information 
(isTriggerActive = true), but will require less change if it is acceptable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184236837
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
   }
 
   test("map") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .map(r => r.getLong(0) * 2)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().map(_.getInt(0) * 2)
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).map(_ * 2): _*),
+  StopStream,
+  AddData(input, 3.to(5): _*),
+  StartStream(),
+  CheckAnswer(0.to(5).map(_ * 2): _*))
   }
 
   test("flatMap") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2))
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().flatMap(r => Seq(0, r.getInt(0), r.getInt(0) * 
2))
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 20).flatMap(n => Seq(0, n, n 
* 2)).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).flatMap(n => Seq(0, n, n * 2)): _*),
+  StopStream,
+  AddData(input, 3.to(5): _*),
+  StartStream(),
+  CheckAnswer(0.to(5).flatMap(n => Seq(0, n, n * 2)): _*))
   }
 
   test("filter") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .where('value > 5)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().where('value > 5)
--- End diff --

@jose-torres What do you think about this? Would it be better to have tests 
for untyped and typed? Code duplication is not that huge since I guess logic 
for verification can be reused for every test.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21152: [SPARK-23688][SS] Refactor tests away from rate source

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21152
  
also cc. to @tdas since he reviews SS related PRs (as well as continuous 
mode) so far.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184213878
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
   }
 
   test("map") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .map(r => r.getLong(0) * 2)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().map(_.getInt(0) * 2)
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).map(_ * 2): _*),
--- End diff --

@jose-torres 
Yeah my intention is ensuring Spark operations work same as Scala 
collection methods, but sure enumerating is also OK since we all know about the 
result easily. 
Are you in favor of enumerating literals we already know instead of 
calculating for all the tests? Or just only this line? Just would like to apply 
the approach consistently.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184006570
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
   }
 
   test("map") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .map(r => r.getLong(0) * 2)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().map(_.getInt(0) * 2)
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).map(_ * 2): _*),
+  StopStream,
+  AddData(input, 3.to(5): _*),
+  StartStream(),
+  CheckAnswer(0.to(5).map(_ * 2): _*))
   }
 
   test("flatMap") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2))
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().flatMap(r => Seq(0, r.getInt(0), r.getInt(0) * 
2))
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 20).flatMap(n => Seq(0, n, n 
* 2)).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).flatMap(n => Seq(0, n, n * 2)): _*),
+  StopStream,
+  AddData(input, 3.to(5): _*),
+  StartStream(),
+  CheckAnswer(0.to(5).flatMap(n => Seq(0, n, n * 2)): _*))
   }
 
   test("filter") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .where('value > 5)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().where('value > 5)
--- End diff --

I intended to use untyped filter because of SPARK-24061. Once #21136 is 
merged we could change this, but not sure we want to have both untyped and 
typed for every tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21152: [SPARK-23688][SS] Refactor tests away from rate source

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21152
  
@jose-torres Please review this PR. Thanks!
cc. @jerryshao @HyukjinKwon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-25 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21152

[SPARK-23688][SS] Refactor tests away from rate source

## What changes were proposed in this pull request?

Replace rate source with memory source in continuous mode. Keep using 
"rate" source if the tests intend to put data periodically in background, or 
need to write short source name, since "memory" doesn't have provider for 
source.

## How was this patch tested?

Ran relevant test suite from IDE.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-23688

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21152.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21152


commit 5aac856b3ef0118d174f016fc6a476f0facf174b
Author: Jungtaek Lim <kabhwan@...>
Date:   2018-04-25T09:46:30Z

[SPARK-23688][SS] Refactor tests away from rate source

* replace rate source with memory source in continous mode




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21136: [SPARK-24061][SS]Add TypedFilter support for continuous ...

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21136
  
LGTM. This is what I also found so far today.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-24 Thread HeartSaVioR
Github user HeartSaVioR commented on the pull request:

https://github.com/apache/spark/pull/9812#issuecomment-159347482
  
@srowen @vanzin @holdenk Thanks for reviewing and merging!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the pull request:

https://github.com/apache/spark/pull/9812#issuecomment-158338205
  
@vanzin Thanks for reviewing, I addressed your comment. Please take a look 
again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the pull request:

https://github.com/apache/spark/pull/9812#issuecomment-158377559
  
Failed tests seems not related.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-18 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/9812

[SPARK-11818][REPL] Fix ExecutorClassLoader to lookup resources from …

…parent class loader

Without patch, two additional tests of ExecutorClassLoaderSuite fails.

- "resource from parent"
- "resources from parent"

Detailed explanation is here, 
https://issues.apache.org/jira/browse/SPARK-11818?focusedCommentId=15011202=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15011202

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-11818

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/9812.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9812


commit 698159a0a1fcf8c0ba042daf875d037df3f8ed6f
Author: Jungtaek Lim <kabh...@gmail.com>
Date:   2015-11-18T15:21:18Z

[SPARK-11818][REPL] Fix ExecutorClassLoader to lookup resources from parent 
class loader

* Without patch, some tests of ExecutorClassLoaderSuite fails




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/9812#discussion_r45288354
  
--- Diff: 
repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala ---
@@ -55,6 +57,14 @@ class ExecutorClassLoader(conf: SparkConf, classUri: 
String, parent: ClassLoader
 }
   }
 
+  override def getResource(name: String): URL = {
+parentLoader.getResource(name)
--- End diff --

@vanzin 
To clarify about "feature", do you want me to change implementation of 
findResource() and findResources() for pointing origin scheme, and forget about 
potential odd? Or forget about finding resources from REPL uri and leave as 
this PR is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/9812#discussion_r45288990
  
--- Diff: 
repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala ---
@@ -55,6 +57,14 @@ class ExecutorClassLoader(conf: SparkConf, classUri: 
String, parent: ClassLoader
 }
   }
 
+  override def getResource(name: String): URL = {
+parentLoader.getResource(name)
--- End diff --

@vanzin 
OK, Thanks for clarification! :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/9812#discussion_r45285698
  
--- Diff: 
repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala ---
@@ -55,6 +57,14 @@ class ExecutorClassLoader(conf: SparkConf, classUri: 
String, parent: ClassLoader
 }
   }
 
+  override def getResource(name: String): URL = {
+parentLoader.getResource(name)
--- End diff --

@vanzin 
I didn't see use case you mentioned but it could make sense.

In order to achieve, we have to implement findResource() and 
findResources() for ExecutorClassLoader since ExecutorClassLoader cannot rely 
on superclass (ClassLoader) to load class / resource.
It is easy to provide resource URL which points to origin scheme (http, 
https, ftp, hdfs), but since I'm new to classloader, so I'm wondering it is 
safe to return URL from findResource() and findResources() which doesn't point 
to local file.

If it is not safe to return non local file as URL, what's recommended way 
to do?
I can only think about downloading files to local temp directory per every 
call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/9812#discussion_r45260904
  
--- Diff: core/src/main/scala/org/apache/spark/TestUtils.scala ---
@@ -159,6 +159,16 @@ private[spark] object TestUtils {
 createCompiledClass(className, destDir, sourceFile, classpathUrls)
   }
 
+  def createResource(
--- End diff --

@srowen Thanks, I'll inlining it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/9812#discussion_r45261349
  
--- Diff: 
repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala ---
@@ -55,6 +57,14 @@ class ExecutorClassLoader(conf: SparkConf, classUri: 
String, parent: ClassLoader
 }
   }
 
+  override def getResource(name: String): URL = {
+parentLoader.getResource(name)
--- End diff --

@srowen 
It doesn't need to check ```userClassPathFirst``` since this implementation 
implies that REPL never provides resources dynamically so there's no need to 
lookup resource from ExecutorClassLoader itself.

Btw, could precondition be broken? I couldn't imagine REPL generating 
resources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5