[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...

2014-10-27 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/2871#issuecomment-60716720
  
LGTM, too.  Very clever testing strategy!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4108][sql]

2014-10-27 Thread liancheng
Github user liancheng commented on the pull request:

https://github.com/apache/spark/pull/2970#issuecomment-60716619
  
LGTM, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4108][sql]

2014-10-27 Thread liancheng
Github user liancheng commented on the pull request:

https://github.com/apache/spark/pull/2970#issuecomment-60716625
  
Test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: fix broken links in README.md

2014-10-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/2859


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3904] [SQL] add constant objectinspecto...

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2762#issuecomment-60716603
  
  [Test build #22345 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22345/consoleFull)
 for   PR 2762 at commit 
[`782722f`](https://github.com/apache/spark/commit/782722fc78e00f44cb1c23645cd57b819edceac9).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...

2014-10-27 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2871#discussion_r19455404
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -157,14 +161,12 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 out.defaultWriteObject()
   }
 
-  /** Used by the JVM when deserializing this object. */
-  private def readObject(in: ObjectInputStream): Unit = 
Utils.tryOrIOException {
-in.defaultReadObject()
+  private def readBroadcastBlock(): T = Utils.tryOrIOException {
--- End diff --

I don't think that `Utils.tryOrIOException` is _strictly_ necessary here, 
since this method will no longer be called from `readObject()`, so we won't 
have to worry about any exceptions that it throws being swallowed inside of a 
Java deserializer.  I added the `tryOrIOException` blocks as part of a general 
campaign to combat error-reporting issues in our custom Externalizable / 
Serializable classes.  It's fine to use this here, though, if you want to 
restrict the range of possible exceptions that can be thrown when calling 
`getValue()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3904] [SQL] add constant objectinspecto...

2014-10-27 Thread liancheng
Github user liancheng commented on the pull request:

https://github.com/apache/spark/pull/2762#issuecomment-60716472
  
test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: fix broken links in README.md

2014-10-27 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/2859#issuecomment-60716464
  
Merging this. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [ SPARK-1812] Adjust build system and tests to...

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2615#issuecomment-60716442
  
  [Test build #22343 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22343/consoleFull)
 for   PR 2615 at commit 
[`22858c8`](https://github.com/apache/spark/commit/22858c8cac6aa94f250c851f24c357ff853faf23).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark 3922] Refactor spark-core to use Utils....

2014-10-27 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/2781#issuecomment-60716440
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2969#discussion_r19455324
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TachyonStore.scala 
---
@@ -105,25 +106,17 @@ private[spark] class TachyonStore(
   return None
 }
 val is = file.getInStream(ReadType.CACHE)
-var buffer: ByteBuffer = null
+assert (is != null)
 try {
-  if (is != null) {
-val size = file.length
-val bs = new Array[Byte](size.asInstanceOf[Int])
-val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
-buffer = ByteBuffer.wrap(bs)
-if (fetchSize != size) {
-  logWarning(s"Failed to fetch the block $blockId from Tachyon: 
Size $size " +
-s"is not equal to fetched size $fetchSize")
-  return None
-}
-  }
+  val size = file.length
+  val bs = new Array[Byte](size.asInstanceOf[Int])
+  ByteStreams.readFully(is, bs)
+  Some(ByteBuffer.wrap(bs))
 } catch {
--- End diff --

ok you convinced me on this one.

are there any other type of exceptions (not errors) that can be thrown? we 
shouldn't throw a random exception when the implicit contract of the api is 
already None - if it fails?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2871#discussion_r19455293
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -173,15 +175,21 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
   val time = (System.nanoTime() - start) / 1e9
   logInfo("Reading broadcast variable " + id + " took " + time + " 
s")
--- End diff --

actually if u need to change josh's other comment, can u fix this one as 
well? :) thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...

2014-10-27 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2871#discussion_r19455263
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -173,15 +175,21 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
   val time = (System.nanoTime() - start) / 1e9
   logInfo("Reading broadcast variable " + id + " took " + time + " 
s")
 
-  _value =
-TorrentBroadcast.unBlockifyObject[T](blocks, 
SparkEnv.get.serializer, compressionCodec)
+  val obj = TorrentBroadcast.unBlockifyObject[T](
+blocks, SparkEnv.get.serializer, compressionCodec)
   // Store the merged copy in BlockManager so other tasks on this 
executor don't
   // need to re-fetch it.
   SparkEnv.get.blockManager.putSingle(
-broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster 
= false)
+broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = 
false)
+  obj
   }
 }
   }
+
+  /** Used by the JVM when deserializing this object. */
+  private def readObject(in: ObjectInputStream) {
--- End diff --

Do we still need this method if it's just going to call the default 
readObject?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...

2014-10-27 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2871#discussion_r19455214
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -173,15 +175,21 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
   val time = (System.nanoTime() - start) / 1e9
   logInfo("Reading broadcast variable " + id + " took " + time + " 
s")
--- End diff --

+1.  I've seen several cases where this type of human-friendly formatting 
has resulted in log messages that report things like "spilling 0 MB to disk" or 
"task took 0 seconds", which isn't helpful while debugging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [ SPARK-1812] Adjust build system and tests to...

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2615#issuecomment-60715842
  
  [Test build #22342 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22342/consoleFull)
 for   PR 2615 at commit 
[`e5786c8`](https://github.com/apache/spark/commit/e5786c8948971bd408bb6e3ebb05f72e9b75b915).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19455162
  
--- Diff: python/pyspark/mllib/feature.py ---
@@ -18,59 +18,348 @@
 """
 Python package for feature in MLlib.
 """
+import sys
+import warnings
+
+import py4j.protocol
+from py4j.protocol import Py4JJavaError
+from py4j.java_gateway import JavaObject
+
+from pyspark import RDD, SparkContext
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
+from pyspark.mllib.linalg import Vectors, _to_java_object_rdd
+
+__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
+   'HashTF', 'IDFModel', 'IDF',
+   'Word2Vec', 'Word2VecModel']
+
+
+# Hack for support float('inf') in Py4j
+old_smart_decode = py4j.protocol.smart_decode
+
+float_str_mapping = {
+u'nan': u'NaN',
+u'inf': u'Infinity',
+u'-inf': u'-Infinity',
+}
+
+
+def new_smart_decode(obj):
+if isinstance(obj, float):
+s = unicode(obj)
+return float_str_mapping.get(s, s)
+return old_smart_decode(obj)
+
+py4j.protocol.smart_decode = new_smart_decode
+
+
+# TODO: move these helper functions into utils
+_picklable_classes = [
+'LinkedList',
+'SparseVector',
+'DenseVector',
+'DenseMatrix',
+'Rating',
+'LabeledPoint',
+]
+
+
+def _py2java(sc, a):
+""" Convert Python object into Java """
+if isinstance(a, RDD):
+a = _to_java_object_rdd(a)
+elif not isinstance(a, (int, long, float, bool, basestring)):
+bytes = bytearray(PickleSerializer().dumps(a))
+a = sc._jvm.SerDe.loads(bytes)
+return a
+
+
+def _java2py(sc, r):
+if isinstance(r, JavaObject):
+clsName = r.getClass().getSimpleName()
+if clsName in ("RDD", "JavaRDD"):
+if clsName == "RDD":
+r = r.toJavaRDD()
+jrdd = sc._jvm.SerDe.javaToPython(r)
+return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
 
-__all__ = ['Word2Vec', 'Word2VecModel']
+elif clsName in _picklable_classes:
+r = sc._jvm.SerDe.dumps(r)
 
+if isinstance(r, bytearray):
+r = PickleSerializer().loads(str(r))
+return r
 
-class Word2VecModel(object):
+
+def _callJavaFunc(sc, func, *args):
+""" Call Java Function
 """
-class for Word2Vec model
+args = [_py2java(sc, a) for a in args]
+return _java2py(sc, func(*args))
+
+
+def _callAPI(sc, name, *args):
+""" Call API in PythonMLLibAPI
 """
-def __init__(self, sc, java_model):
+api = getattr(sc._jvm.PythonMLLibAPI(), name)
+return _callJavaFunc(sc, api, *args)
+
+
+class VectorTransformer(object):
+"""
+:: DeveloperApi ::
+Base class for transformation of a vector or RDD of vector
+"""
+def transform(self, vector):
 """
-:param sc:  Spark context
-:param java_model:  Handle to Java model object
+Applies transformation on a vector.
+
+:param vector: vector to be transformed.
 """
+raise NotImplementedError
+
+
+class Normalizer(VectorTransformer):
+"""
+:: Experimental ::
+Normalizes samples individually to unit L^p^ norm
+
+For any 1 <= p <= float('inf'), normalizes samples using
+sum(abs(vector).^p^)^(1/p)^ as norm.
+
+For p = float('inf'), max(abs(vector)) will be used as norm for 
normalization.
+
+>>> v = Vectors.dense(range(3))
+>>> nor = Normalizer(1)
+>>> nor.transform(v)
+DenseVector([0.0, 0., 0.6667])
+
+>>> rdd = sc.parallelize([v])
+>>> nor.transform(rdd).collect()
+[DenseVector([0.0, 0., 0.6667])]
+
+>>> nor2 = Normalizer(float("inf"))
+>>> nor2.transform(v)
+DenseVector([0.0, 0.5, 1.0])
+"""
+def __init__(self, p=2):
+"""
+:param p: Normalization in L^p^ space, p = 2 by default.
+"""
+assert p >= 1.0, "p should be greater than 1.0"
+self.p = float(p)
+
+def transform(self, vector):
+"""
+Applies unit length normalization on a vector.
+
+:param vector: vector to be normalized.
+:return: normalized vector. If the norm of the input is zero, it
+will return the input vector.
+"""
+sc = SparkContext._active_spark_context
+assert s

[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19455165
  
--- Diff: python/pyspark/mllib/feature.py ---
@@ -95,33 +385,26 @@ class Word2Vec(object):
 >>> localDoc = [sentence, sentence]
 >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" "))
 >>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc)
+
 >>> syms = model.findSynonyms("a", 2)
->>> str(syms[0][0])
-'b'
->>> str(syms[1][0])
-'c'
->>> len(syms)
-2
+>>> [s[0] for s in syms]
+[u'b', u'c']
 >>> vec = model.transform("a")
->>> len(vec)
-10
 >>> syms = model.findSynonyms(vec, 2)
->>> str(syms[0][0])
-'b'
->>> str(syms[1][0])
-'c'
->>> len(syms)
-2
+>>> [s[0] for s in syms]
+[u'b', u'c']
 """
 def __init__(self):
 """
 Construct Word2Vec instance
 """
+import random  # this can't be on the top because of mllib.random
+
 self.vectorSize = 100
 self.learningRate = 0.025
 self.numPartitions = 1
 self.numIterations = 1
-self.seed = 42L
+self.seed = random.randint(0, sys.maxint)
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [ SPARK-1812] Adjust build system and tests to...

2014-10-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2615#issuecomment-60715844
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22342/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...

2014-10-27 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2969#discussion_r19455153
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TachyonStore.scala 
---
@@ -105,25 +106,17 @@ private[spark] class TachyonStore(
   return None
 }
 val is = file.getInStream(ReadType.CACHE)
-var buffer: ByteBuffer = null
+assert (is != null)
 try {
-  if (is != null) {
-val size = file.length
-val bs = new Array[Byte](size.asInstanceOf[Int])
-val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
-buffer = ByteBuffer.wrap(bs)
-if (fetchSize != size) {
-  logWarning(s"Failed to fetch the block $blockId from Tachyon: 
Size $size " +
-s"is not equal to fetched size $fetchSize")
-  return None
-}
-  }
+  val size = file.length
+  val bs = new Array[Byte](size.asInstanceOf[Int])
+  ByteStreams.readFully(is, bs)
+  Some(ByteBuffer.wrap(bs))
 } catch {
--- End diff --

Ah, gotcha.

From a general API design perspective, I think it's a little weird to have 
methods that swallow OOMs and resurface them as other errors.  As a library 
consumer, how would you feel if, say, Snappy were to swallow OOMs and re-throw 
them as IOException?  A library with that sort of unexpected behavior might 
actually break user applications' ability to recover from OOMs: if we ran out 
of memory due to excessive memory usage in some other part of the app but the 
OOM happened to be caught and swallowed by the library, then the top-level 
uncaught exception handler might never get a chance to respond to the OOM in an 
application-specific way (e.g. attempt to close files, trigger GCs, or clear 
caches).

I'm not against handling OOMs in principle, but I think that it should 
probably be done at higher levels of the stack since that seems easier to 
reason about.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4097] Fix the race condition of 'thread...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2957#discussion_r19455126
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -210,7 +210,9 @@ class ComplexFutureAction[T] extends FutureAction[T] {
   } catch {
 case e: Exception => p.failure(e)
   } finally {
-thread = null
+ComplexFutureAction.this.synchronized {
--- End diff --

what effect does this synchronized do? it seems to me it is actually not 
protecting anything


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19455090
  
--- Diff: python/pyspark/mllib/feature.py ---
@@ -18,59 +18,348 @@
 """
 Python package for feature in MLlib.
 """
+import sys
+import warnings
+
+import py4j.protocol
+from py4j.protocol import Py4JJavaError
+from py4j.java_gateway import JavaObject
+
+from pyspark import RDD, SparkContext
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
+from pyspark.mllib.linalg import Vectors, _to_java_object_rdd
+
+__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
+   'HashTF', 'IDFModel', 'IDF',
+   'Word2Vec', 'Word2VecModel']
+
+
+# Hack for support float('inf') in Py4j
+old_smart_decode = py4j.protocol.smart_decode
+
+float_str_mapping = {
+u'nan': u'NaN',
+u'inf': u'Infinity',
+u'-inf': u'-Infinity',
+}
+
+
+def new_smart_decode(obj):
+if isinstance(obj, float):
+s = unicode(obj)
+return float_str_mapping.get(s, s)
+return old_smart_decode(obj)
+
+py4j.protocol.smart_decode = new_smart_decode
+
+
+# TODO: move these helper functions into utils
+_picklable_classes = [
+'LinkedList',
+'SparseVector',
+'DenseVector',
+'DenseMatrix',
+'Rating',
+'LabeledPoint',
+]
+
+
+def _py2java(sc, a):
+""" Convert Python object into Java """
+if isinstance(a, RDD):
+a = _to_java_object_rdd(a)
+elif not isinstance(a, (int, long, float, bool, basestring)):
+bytes = bytearray(PickleSerializer().dumps(a))
+a = sc._jvm.SerDe.loads(bytes)
+return a
+
+
+def _java2py(sc, r):
+if isinstance(r, JavaObject):
+clsName = r.getClass().getSimpleName()
+if clsName in ("RDD", "JavaRDD"):
+if clsName == "RDD":
+r = r.toJavaRDD()
+jrdd = sc._jvm.SerDe.javaToPython(r)
+return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
 
-__all__ = ['Word2Vec', 'Word2VecModel']
+elif clsName in _picklable_classes:
+r = sc._jvm.SerDe.dumps(r)
 
+if isinstance(r, bytearray):
+r = PickleSerializer().loads(str(r))
+return r
 
-class Word2VecModel(object):
+
+def _callJavaFunc(sc, func, *args):
+""" Call Java Function
 """
-class for Word2Vec model
+args = [_py2java(sc, a) for a in args]
+return _java2py(sc, func(*args))
+
+
+def _callAPI(sc, name, *args):
+""" Call API in PythonMLLibAPI
 """
-def __init__(self, sc, java_model):
+api = getattr(sc._jvm.PythonMLLibAPI(), name)
+return _callJavaFunc(sc, api, *args)
+
+
+class VectorTransformer(object):
+"""
+:: DeveloperApi ::
+Base class for transformation of a vector or RDD of vector
+"""
+def transform(self, vector):
 """
-:param sc:  Spark context
-:param java_model:  Handle to Java model object
+Applies transformation on a vector.
+
+:param vector: vector to be transformed.
 """
+raise NotImplementedError
+
+
+class Normalizer(VectorTransformer):
+"""
+:: Experimental ::
+Normalizes samples individually to unit L^p^ norm
+
+For any 1 <= p <= float('inf'), normalizes samples using
+sum(abs(vector).^p^)^(1/p)^ as norm.
+
+For p = float('inf'), max(abs(vector)) will be used as norm for 
normalization.
+
+>>> v = Vectors.dense(range(3))
+>>> nor = Normalizer(1)
+>>> nor.transform(v)
+DenseVector([0.0, 0., 0.6667])
+
+>>> rdd = sc.parallelize([v])
+>>> nor.transform(rdd).collect()
+[DenseVector([0.0, 0., 0.6667])]
+
+>>> nor2 = Normalizer(float("inf"))
+>>> nor2.transform(v)
+DenseVector([0.0, 0.5, 1.0])
+"""
+def __init__(self, p=2):
--- End diff --

It will be converted into float, but having "2.0" here will be better for 
docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail

[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...

2014-10-27 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/2871#issuecomment-60715459
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19455072
  
--- Diff: python/pyspark/mllib/feature.py ---
@@ -18,59 +18,348 @@
 """
 Python package for feature in MLlib.
 """
+import sys
+import warnings
+
+import py4j.protocol
+from py4j.protocol import Py4JJavaError
+from py4j.java_gateway import JavaObject
+
+from pyspark import RDD, SparkContext
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
+from pyspark.mllib.linalg import Vectors, _to_java_object_rdd
+
+__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
+   'HashTF', 'IDFModel', 'IDF',
+   'Word2Vec', 'Word2VecModel']
+
+
+# Hack for support float('inf') in Py4j
+old_smart_decode = py4j.protocol.smart_decode
+
+float_str_mapping = {
+u'nan': u'NaN',
+u'inf': u'Infinity',
+u'-inf': u'-Infinity',
+}
+
+
+def new_smart_decode(obj):
+if isinstance(obj, float):
+s = unicode(obj)
+return float_str_mapping.get(s, s)
+return old_smart_decode(obj)
+
+py4j.protocol.smart_decode = new_smart_decode
+
+
+# TODO: move these helper functions into utils
+_picklable_classes = [
+'LinkedList',
+'SparseVector',
+'DenseVector',
+'DenseMatrix',
+'Rating',
+'LabeledPoint',
+]
+
+
+def _py2java(sc, a):
+""" Convert Python object into Java """
+if isinstance(a, RDD):
+a = _to_java_object_rdd(a)
+elif not isinstance(a, (int, long, float, bool, basestring)):
+bytes = bytearray(PickleSerializer().dumps(a))
+a = sc._jvm.SerDe.loads(bytes)
+return a
+
+
+def _java2py(sc, r):
+if isinstance(r, JavaObject):
+clsName = r.getClass().getSimpleName()
+if clsName in ("RDD", "JavaRDD"):
+if clsName == "RDD":
+r = r.toJavaRDD()
+jrdd = sc._jvm.SerDe.javaToPython(r)
+return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
 
-__all__ = ['Word2Vec', 'Word2VecModel']
+elif clsName in _picklable_classes:
+r = sc._jvm.SerDe.dumps(r)
 
+if isinstance(r, bytearray):
+r = PickleSerializer().loads(str(r))
+return r
 
-class Word2VecModel(object):
+
+def _callJavaFunc(sc, func, *args):
+""" Call Java Function
 """
-class for Word2Vec model
+args = [_py2java(sc, a) for a in args]
+return _java2py(sc, func(*args))
+
+
+def _callAPI(sc, name, *args):
+""" Call API in PythonMLLibAPI
 """
-def __init__(self, sc, java_model):
+api = getattr(sc._jvm.PythonMLLibAPI(), name)
+return _callJavaFunc(sc, api, *args)
+
+
+class VectorTransformer(object):
+"""
+:: DeveloperApi ::
+Base class for transformation of a vector or RDD of vector
+"""
+def transform(self, vector):
 """
-:param sc:  Spark context
-:param java_model:  Handle to Java model object
+Applies transformation on a vector.
+
+:param vector: vector to be transformed.
 """
+raise NotImplementedError
+
+
+class Normalizer(VectorTransformer):
+"""
+:: Experimental ::
+Normalizes samples individually to unit L^p^ norm
+
+For any 1 <= p <= float('inf'), normalizes samples using
+sum(abs(vector).^p^)^(1/p)^ as norm.
+
+For p = float('inf'), max(abs(vector)) will be used as norm for 
normalization.
+
+>>> v = Vectors.dense(range(3))
+>>> nor = Normalizer(1)
+>>> nor.transform(v)
+DenseVector([0.0, 0., 0.6667])
+
+>>> rdd = sc.parallelize([v])
+>>> nor.transform(rdd).collect()
+[DenseVector([0.0, 0., 0.6667])]
+
+>>> nor2 = Normalizer(float("inf"))
+>>> nor2.transform(v)
+DenseVector([0.0, 0.5, 1.0])
+"""
+def __init__(self, p=2):
+"""
+:param p: Normalization in L^p^ space, p = 2 by default.
+"""
+assert p >= 1.0, "p should be greater than 1.0"
+self.p = float(p)
+
+def transform(self, vector):
+"""
+Applies unit length normalization on a vector.
+
+:param vector: vector to be normalized.
+:return: normalized vector. If the norm of the input is zero, it
+will return the input vector.
+"""
+sc = SparkContext._active_spark_context
+assert s

[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2871#discussion_r19455067
  
--- Diff: 
core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala ---
@@ -21,11 +21,28 @@ import scala.util.Random
 
 import org.scalatest.{Assertions, FunSuite}
 
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkException}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkException, SparkEnv}
 import org.apache.spark.io.SnappyCompressionCodec
+import org.apache.spark.rdd.RDD
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.storage._
 
+// Dummy class that creates a broadcast variable but doesn't use it
+class DummyBroadcastClass(rdd: RDD[Int]) extends Serializable {
+  @transient val list = List(1, 2, 3, 4)
+  val broadcast = rdd.context.broadcast(list)
+  val bid = broadcast.id
+
+  def doSomething() = {
+rdd.map { x =>
--- End diff --

this is pretty neat


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2871#discussion_r19455054
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -173,15 +175,21 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
   val time = (System.nanoTime() - start) / 1e9
   logInfo("Reading broadcast variable " + id + " took " + time + " 
s")
--- End diff --

this is not your change, but we should log ms instead of s here. i will fix 
it later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2969#discussion_r19455025
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TachyonStore.scala 
---
@@ -105,25 +106,17 @@ private[spark] class TachyonStore(
   return None
 }
 val is = file.getInStream(ReadType.CACHE)
-var buffer: ByteBuffer = null
+assert (is != null)
 try {
-  if (is != null) {
-val size = file.length
-val bs = new Array[Byte](size.asInstanceOf[Int])
-val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
-buffer = ByteBuffer.wrap(bs)
-if (fetchSize != size) {
-  logWarning(s"Failed to fetch the block $blockId from Tachyon: 
Size $size " +
-s"is not equal to fetched size $fetchSize")
-  return None
-}
-  }
+  val size = file.length
+  val bs = new Array[Byte](size.asInstanceOf[Int])
+  ByteStreams.readFully(is, bs)
+  Some(ByteBuffer.wrap(bs))
 } catch {
--- End diff --

I was not suggesting your update was inconsistent with the old 
implementation. I merely pointed out this is a good place to check for more 
type of errors because the caller is usually not setup to deal with OOM, but it 
is probably setup to deal with None.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4064]NioBlockTransferService.fetchBlock...

2014-10-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/2929


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3795] Heuristics for dynamically scalin...

2014-10-27 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/2746#issuecomment-60715271
  
@andrewor14 this is looking good. All my comments are about some 
refactoring to make it more testable, and dealing with error conditions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19454983
  
--- Diff: python/pyspark/mllib/feature.py ---
@@ -95,33 +385,26 @@ class Word2Vec(object):
 >>> localDoc = [sentence, sentence]
 >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" "))
 >>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc)
+
 >>> syms = model.findSynonyms("a", 2)
->>> str(syms[0][0])
-'b'
->>> str(syms[1][0])
-'c'
->>> len(syms)
-2
+>>> [s[0] for s in syms]
+[u'b', u'c']
 >>> vec = model.transform("a")
->>> len(vec)
-10
 >>> syms = model.findSynonyms(vec, 2)
->>> str(syms[0][0])
-'b'
->>> str(syms[1][0])
-'c'
->>> len(syms)
-2
+>>> [s[0] for s in syms]
+[u'b', u'c']
 """
 def __init__(self):
 """
 Construct Word2Vec instance
 """
+import random  # this can't be on the top because of mllib.random
+
 self.vectorSize = 100
 self.learningRate = 0.025
 self.numPartitions = 1
 self.numIterations = 1
-self.seed = 42L
+self.seed = random.randint(0, sys.maxint)
--- End diff --

Here has nothing with numpy, so it will have problems.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3904] [SQL] add constant objectinspecto...

2014-10-27 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/2762#issuecomment-60715086
  
test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4064]NioBlockTransferService.fetchBlock...

2014-10-27 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/2929#issuecomment-60715040
  
Thanks. I'm merging this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4064]NioBlockTransferService.fetchBlock...

2014-10-27 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/2929#issuecomment-60715056
  
@aarondav this will conflict with your pr.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [ SPARK-1812] Adjust build system and tests to...

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2615#issuecomment-60714945
  
  [Test build #22342 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22342/consoleFull)
 for   PR 2615 at commit 
[`e5786c8`](https://github.com/apache/spark/commit/e5786c8948971bd408bb6e3ebb05f72e9b75b915).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4133] [SQL] [PySpark] type conversionfo...

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2973#issuecomment-60714816
  
  [Test build #22341 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22341/consoleFull)
 for   PR 2973 at commit 
[`35caa4f`](https://github.com/apache/spark/commit/35caa4fe7d87af752fb2df5bbd3e01e69784920d).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4133] [SQL] [PySpark] type conversionfo...

2014-10-27 Thread davies
GitHub user davies opened a pull request:

https://github.com/apache/spark/pull/2973

[SPARK-4133] [SQL] [PySpark] type conversionfor python udf

Call Python UDF on ArrayType/MapType/PrimitiveType, the returnType can also 
be ArrayType/MapType/PrimitiveType.

For StructType, it will act as tuple (without attributes). If returnType is 
StructType, it also should be tuple.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/davies/spark udf_array

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2973.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2973


commit 35caa4fe7d87af752fb2df5bbd3e01e69784920d
Author: Davies Liu 
Date:   2014-10-28T06:18:10Z

type conversionfor python udf




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...

2014-10-27 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2969#discussion_r19454860
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TachyonStore.scala 
---
@@ -105,25 +106,17 @@ private[spark] class TachyonStore(
   return None
 }
 val is = file.getInStream(ReadType.CACHE)
-var buffer: ByteBuffer = null
+assert (is != null)
 try {
-  if (is != null) {
-val size = file.length
-val bs = new Array[Byte](size.asInstanceOf[Int])
-val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
-buffer = ByteBuffer.wrap(bs)
-if (fetchSize != size) {
-  logWarning(s"Failed to fetch the block $blockId from Tachyon: 
Size $size " +
-s"is not equal to fetched size $fetchSize")
-  return None
-}
-  }
+  val size = file.length
+  val bs = new Array[Byte](size.asInstanceOf[Int])
+  ByteStreams.readFully(is, bs)
+  Some(ByteBuffer.wrap(bs))
 } catch {
--- End diff --

I think that catching OutOfMemoryError is inconsistent with the rest of the 
code base.  The only places where we catch it are for the purposes of logging 
more information about why an executor died.

I don't think that any of the changes that I've made here are inconsistent 
with the old implementation: if we fail to fetch, we still return None, since 
ByteStreams.readFully throws IOException when it encounters errors.

Besides, couldn't the OOM have occurred due to some other thread starving 
this one of memory?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3343] [SQL] Add serde support for CTAS

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2570#issuecomment-60714742
  
  [Test build #22337 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22337/consoleFull)
 for   PR 2570 at commit 
[`e011ef5`](https://github.com/apache/spark/commit/e011ef557be3f438c5052e65462d5fbb89b51b6d).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case class CreateTableAsSelect[T](`
  * `  logDebug("Found class for $serdeName")`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454844
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.rdd.BlockRDD
+import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
HdfsUtils, WriteAheadLogRandomReader}
+import org.apache.spark._
--- End diff --

import out of order


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3343] [SQL] Add serde support for CTAS

2014-10-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2570#issuecomment-60714744
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22337/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3795] Heuristics for dynamically scalin...

2014-10-27 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/2746#discussion_r19454814
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.collection.mutable
+
+import org.apache.spark.scheduler._
+
+/**
+ * An agent that dynamically allocates and removes executors based on the 
workload.
+ *
+ * The add policy depends on whether there are backlogged tasks waiting to 
be scheduled. If
+ * the scheduler queue is not drained in N seconds, then new executors are 
added. If the queue
+ * persists for another M seconds, then more executors are added and so 
on. The number added
+ * in each round increases exponentially from the previous round until an 
upper bound on the
+ * number of executors has been reached.
+ *
+ * The rationale for the exponential increase is twofold: (1) Executors 
should be added slowly
+ * in the beginning in case the number of extra executors needed turns out 
to be small. Otherwise,
+ * we may add more executors than we need just to remove them later. (2) 
Executors should be added
+ * quickly over time in case the maximum number of executors is very high. 
Otherwise, it will take
+ * a long time to ramp up under heavy workloads.
+ *
+ * The remove policy is simpler: If an executor has been idle for K 
seconds, meaning it has not
+ * been scheduled to run any tasks, then it is removed.
+ *
+ * There is no retry logic in either case because we make the assumption 
that the cluster manager
+ * will eventually fulfill all requests it receives asynchronously.
+ *
+ * The relevant Spark properties include the following:
+ *
+ *   spark.dynamicAllocation.enabled - Whether this feature is enabled
+ *   spark.dynamicAllocation.minExecutors - Lower bound on the number of 
executors
+ *   spark.dynamicAllocation.maxExecutors - Upper bound on the number of 
executors
+ *
+ *   spark.dynamicAllocation.schedulerBacklogTimeout (M) -
+ * If there are backlogged tasks for this duration, add new executors
+ *
+ *   spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) -
+ * If the backlog is sustained for this duration, add more executors
+ * This is used only after the initial backlog timeout is exceeded
+ *
+ *   spark.dynamicAllocation.executorIdleTimeout (K) -
+ * If an executor has been idle for this duration, remove it
+ */
+private[spark] class ExecutorAllocationManager(sc: SparkContext) extends 
Logging {
+  import ExecutorAllocationManager._
+
+  private val conf = sc.conf
+
+  // Lower and upper bounds on the number of executors. These are required.
+  private val minNumExecutors = 
conf.getInt("spark.dynamicAllocation.minExecutors", -1)
+  private val maxNumExecutors = 
conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
+  if (minNumExecutors < 0 || maxNumExecutors < 0) {
+throw new SparkException("spark.dynamicAllocation.{min/max}Executors 
must be set!")
+  }
+  if (minNumExecutors > maxNumExecutors) {
+throw new SparkException("spark.dynamicAllocation.minExecutors must " +
+  "be less than or equal to spark.dynamicAllocation.maxExecutors!")
+  }
+
+  // How long there must be backlogged tasks for before an addition is 
triggered
+  private val schedulerBacklogTimeout = conf.getLong(
+"spark.dynamicAllocation.schedulerBacklogTimeout", 60)
+
+  // Same as above, but used only after `schedulerBacklogTimeout` is 
exceeded
+  private val sustainedSchedulerBacklogTimeout = conf.getLong(
+"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", 
schedulerBacklogTimeout)
+
+  // How long an executor must be idle for before it is removed
+  private val removeThresholdSeconds = conf.getLong(
+"spark.dynamicAllocation.executorIdleTimeout", 600)
+
+

[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454823
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.rdd.BlockRDD
+import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
HdfsUtils, WriteAheadLogRandomReader}
+import org.apache.spark._
+
+private[streaming]
+class HDFSBackedBlockRDDPartition(
+val blockId: BlockId,
+val index: Int,
+val segment: WriteAheadLogFileSegment
+  ) extends Partition
+
+private[streaming]
+class HDFSBackedBlockRDD[T: ClassTag](
--- End diff --

It would be great to add javadoc explaining what this class is for. If it 
is used for recovery, why should we put the blocks in block manager after using 
them? Shouldn't recovery data be used only once during a recovery?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454808
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.rdd.BlockRDD
+import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
HdfsUtils, WriteAheadLogRandomReader}
+import org.apache.spark._
+
+private[streaming]
+class HDFSBackedBlockRDDPartition(
+val blockId: BlockId,
+val index: Int,
+val segment: WriteAheadLogFileSegment
+  ) extends Partition
+
+private[streaming]
+class HDFSBackedBlockRDD[T: ClassTag](
--- End diff --

I don't think it makes sense to tie this to WriteAheadLogFileSegment. On 
one hand the naming HDFSBackedBlockRDD is supposed to be general, on the other 
you tie it to recovery through the use of WriteAheadLogFileSegment. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [ SPARK-1812] Adjust build system and tests to...

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2615#issuecomment-60714610
  
  [Test build #22340 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22340/consoleFull)
 for   PR 2615 at commit 
[`06a82b9`](https://github.com/apache/spark/commit/06a82b961f5ba08669b28810aa4dcae001428d8c).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2871#issuecomment-60714605
  
  [Test build #22335 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22335/consoleFull)
 for   PR 2871 at commit 
[`d6c5ee9`](https://github.com/apache/spark/commit/d6c5ee9629e4bc894913a4d181ca8ed963472e49).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...

2014-10-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2871#issuecomment-60714608
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22335/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [ SPARK-1812] Adjust build system and tests to...

2014-10-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2615#issuecomment-60714611
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22340/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2969#discussion_r19454743
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TachyonStore.scala 
---
@@ -105,25 +106,17 @@ private[spark] class TachyonStore(
   return None
 }
 val is = file.getInStream(ReadType.CACHE)
-var buffer: ByteBuffer = null
+assert (is != null)
 try {
-  if (is != null) {
-val size = file.length
-val bs = new Array[Byte](size.asInstanceOf[Int])
-val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
-buffer = ByteBuffer.wrap(bs)
-if (fetchSize != size) {
-  logWarning(s"Failed to fetch the block $blockId from Tachyon: 
Size $size " +
-s"is not equal to fetched size $fetchSize")
-  return None
-}
-  }
+  val size = file.length
+  val bs = new Array[Byte](size.asInstanceOf[Int])
+  ByteStreams.readFully(is, bs)
+  Some(ByteBuffer.wrap(bs))
 } catch {
--- End diff --

because we are supposed to return None if this fails ... it is pretty easy 
to recover from OOM here, if the OOM is due to the creation of the array.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [ SPARK-1812] Adjust build system and tests to...

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2615#issuecomment-60714433
  
  [Test build #22340 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22340/consoleFull)
 for   PR 2615 at commit 
[`06a82b9`](https://github.com/apache/spark/commit/06a82b961f5ba08669b28810aa4dcae001428d8c).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3954][Streaming] promote the speed of c...

2014-10-27 Thread surq
Github user surq commented on the pull request:

https://github.com/apache/spark/pull/2811#issuecomment-60714301
  

@tdas 
Since this patch was proposed also some day.If you have time, please pay 
more attention to this patch.
@jerryshao 
Thanks for your kind help.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454664
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.rdd.BlockRDD
+import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
HdfsUtils, WriteAheadLogRandomReader}
+import org.apache.spark._
+
+private[streaming]
+class HDFSBackedBlockRDDPartition(
+val blockId: BlockId,
+val index: Int,
+val segment: WriteAheadLogFileSegment
+  ) extends Partition
+
+private[streaming]
+class HDFSBackedBlockRDD[T: ClassTag](
+@transient sc: SparkContext,
+@transient hadoopConfiguration: Configuration,
+@transient override val blockIds: Array[BlockId],
+@transient val segments: Array[WriteAheadLogFileSegment],
+val storeInBlockManager: Boolean,
+val storageLevel: StorageLevel
+  ) extends BlockRDD[T](sc, blockIds) {
+
+  require(blockIds.length == segments.length,
+"Number of block ids must be the same as number of segments!")
+
+  // Hadoop Configuration is not serializable, so broadcast it as a 
serializable.
+  val broadcastedHadoopConf = sc.broadcast(new 
SerializableWritable(hadoopConfiguration))
+
+  override def getPartitions: Array[Partition] = {
+assertValid()
+(0 until blockIds.size).map { i =>
--- End diff --

Array.tabulate


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454662
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.rdd.BlockRDD
+import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
HdfsUtils, WriteAheadLogRandomReader}
+import org.apache.spark._
+
+private[streaming]
+class HDFSBackedBlockRDDPartition(
+val blockId: BlockId,
+val index: Int,
+val segment: WriteAheadLogFileSegment
+  ) extends Partition
+
+private[streaming]
+class HDFSBackedBlockRDD[T: ClassTag](
+@transient sc: SparkContext,
+@transient hadoopConfiguration: Configuration,
+@transient override val blockIds: Array[BlockId],
+@transient val segments: Array[WriteAheadLogFileSegment],
+val storeInBlockManager: Boolean,
+val storageLevel: StorageLevel
+  ) extends BlockRDD[T](sc, blockIds) {
+
+  require(blockIds.length == segments.length,
+"Number of block ids must be the same as number of segments!")
+
+  // Hadoop Configuration is not serializable, so broadcast it as a 
serializable.
+  val broadcastedHadoopConf = sc.broadcast(new 
SerializableWritable(hadoopConfiguration))
--- End diff --

can this be private[this]?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454635
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.rdd.BlockRDD
+import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
HdfsUtils, WriteAheadLogRandomReader}
+import org.apache.spark._
+
+private[streaming]
+class HDFSBackedBlockRDDPartition(
+val blockId: BlockId,
+val index: Int,
+val segment: WriteAheadLogFileSegment
+  ) extends Partition
+
+private[streaming]
+class HDFSBackedBlockRDD[T: ClassTag](
+@transient sc: SparkContext,
+@transient hadoopConfiguration: Configuration,
+@transient override val blockIds: Array[BlockId],
--- End diff --

i don't think u need to declare this a field. it's already a field in the 
parent class and you can just use that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3822] Executor scaling mechanism for Ya...

2014-10-27 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/2840#discussion_r19454618
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1501,8 +1533,13 @@ object SparkContext extends Logging {
 res
   }
 
-  /** Creates a task scheduler based on a given master URL. Extracted for 
testing. */
-  private def createTaskScheduler(sc: SparkContext, master: String): 
TaskScheduler = {
+  /**
+   * Create a task scheduler based on a given master URL.
+   * Return a 2-tuple of the scheduler backend and the task scheduler.
+   */
+  private def createTaskScheduler(
+  sc: SparkContext,
+  master: String): (SchedulerBackend, TaskScheduler) = {
--- End diff --

Okay - sounds good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454599
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.rdd.BlockRDD
+import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
HdfsUtils, WriteAheadLogRandomReader}
+import org.apache.spark._
+
+private[streaming]
+class HDFSBackedBlockRDDPartition(
+val blockId: BlockId,
+val index: Int,
+val segment: WriteAheadLogFileSegment
+  ) extends Partition
+
+private[streaming]
+class HDFSBackedBlockRDD[T: ClassTag](
+@transient sc: SparkContext,
+@transient hadoopConfiguration: Configuration,
+@transient override val blockIds: Array[BlockId],
+@transient val segments: Array[WriteAheadLogFileSegment],
+val storeInBlockManager: Boolean,
+val storageLevel: StorageLevel
+  ) extends BlockRDD[T](sc, blockIds) {
+
+  require(blockIds.length == segments.length,
+"Number of block ids must be the same as number of segments!")
+
+  // Hadoop Configuration is not serializable, so broadcast it as a 
serializable.
+  val broadcastedHadoopConf = sc.broadcast(new 
SerializableWritable(hadoopConfiguration))
+
+  override def getPartitions: Array[Partition] = {
+assertValid()
+(0 until blockIds.size).map { i =>
+  new HDFSBackedBlockRDDPartition(blockIds(i), i, segments(i))
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[T] = {
+assertValid()
+val hadoopConf = broadcastedHadoopConf.value.value
+val blockManager = SparkEnv.get.blockManager
+val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition]
+val blockId = partition.blockId
+blockManager.get(blockId) match {
+  // Data is in Block Manager, grab it from there.
+  case Some(block) =>
+block.data.asInstanceOf[Iterator[T]]
+  // Data not found in Block Manager, grab it from HDFS
+  case None =>
+logInfo("Reading partition data from write ahead log " + 
partition.segment.path)
+val reader = new WriteAheadLogRandomReader(partition.segment.path, 
hadoopConf)
+val dataRead = reader.read(partition.segment)
+reader.close()
+// Currently, we support storing the data to BM only in serialized 
form and not in
+// deserialized form
+if (storeInBlockManager) {
+  blockManager.putBytes(blockId, dataRead, storageLevel)
+}
+dataRead.rewind()
+blockManager.dataDeserialize(blockId, 
dataRead).asInstanceOf[Iterator[T]]
+}
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition]
+val locations = getBlockIdLocations()
+locations.getOrElse(partition.blockId,
--- End diff --

these few lines of getOrElse's are way too complicated to use getOrElse.

For the outer most layer, just create an if/else to make it more clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454583
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import java.io.File
--- End diff --

imports out of order


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454568
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import java.io.File
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.collection.mutable.ArrayBuffer
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
+
+import com.google.common.io.Files
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, 
StreamBlockId}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
WriteAheadLogWriter}
+
+class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with 
BeforeAndAfterAll {
+  val conf = new SparkConf()
+.setMaster("local[2]")
+.setAppName(this.getClass.getSimpleName)
+  val hadoopConf = new Configuration()
+  // Since the same BM is reused in all tests, use an atomic int to 
generate ids
+  val idGenerator = new AtomicInteger(0)
+  
+  var sparkContext: SparkContext = null
+  var blockManager: BlockManager = null
+  var file: File = null
+  var dir: File = null
+
+  before {
+blockManager = sparkContext.env.blockManager
+dir = Files.createTempDir()
+file = new File(dir, "BlockManagerWrite")
+  }
+
+  after {
+file.delete()
+dir.delete()
+  }
+
+  override def beforeAll(): Unit = {
+sparkContext = new SparkContext(conf)
+  }
+
+  override def afterAll(): Unit = {
+// Copied from LocalSparkContext which can't be imported since 
spark-core test-jar does not
+// get imported properly by sbt even if it is created.
+sparkContext.stop()
+System.clearProperty("spark.driver.port")
+  }
+
+  test("Data available in BM and HDFS") {
+testHDFSBackedRDD(5, 5, 20, 5)
+  }
+
+  test("Data available in in BM but not in HDFS") {
+testHDFSBackedRDD(5, 0, 20, 5)
+  }
+
+  test("Data available in in HDFS and not in BM") {
+testHDFSBackedRDD(0, 5, 20, 5)
+  }
+
+  test("Data partially available in BM, and the rest in HDFS") {
+testHDFSBackedRDD(3, 2, 20, 5)
+  }
+
+  /**
+   * Write a bunch of events into the HDFS Block RDD. Put a part of all of 
them to the
+   * BlockManager, so all reads need not happen from HDFS.
+   * @param total Total number of Strings to write
+   * @param blockCount Number of blocks to write (therefore, total # of 
events per block =
+   *   total/blockCount
+   */
+  private def testHDFSBackedRDD(
--- End diff --

basically i'm worried the test case is too complicated for others to 
understand without enough comment, and it will be modified incorrectly in the 
future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454543
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import java.io.File
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.collection.mutable.ArrayBuffer
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
+
+import com.google.common.io.Files
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, 
StreamBlockId}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
WriteAheadLogWriter}
+
+class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with 
BeforeAndAfterAll {
+  val conf = new SparkConf()
+.setMaster("local[2]")
+.setAppName(this.getClass.getSimpleName)
+  val hadoopConf = new Configuration()
+  // Since the same BM is reused in all tests, use an atomic int to 
generate ids
+  val idGenerator = new AtomicInteger(0)
+  
+  var sparkContext: SparkContext = null
+  var blockManager: BlockManager = null
+  var file: File = null
+  var dir: File = null
+
+  before {
+blockManager = sparkContext.env.blockManager
+dir = Files.createTempDir()
+file = new File(dir, "BlockManagerWrite")
+  }
+
+  after {
+file.delete()
+dir.delete()
+  }
+
+  override def beforeAll(): Unit = {
+sparkContext = new SparkContext(conf)
+  }
+
+  override def afterAll(): Unit = {
+// Copied from LocalSparkContext which can't be imported since 
spark-core test-jar does not
+// get imported properly by sbt even if it is created.
+sparkContext.stop()
+System.clearProperty("spark.driver.port")
+  }
+
+  test("Data available in BM and HDFS") {
+testHDFSBackedRDD(5, 5, 20, 5)
+  }
+
+  test("Data available in in BM but not in HDFS") {
+testHDFSBackedRDD(5, 0, 20, 5)
+  }
+
+  test("Data available in in HDFS and not in BM") {
+testHDFSBackedRDD(0, 5, 20, 5)
+  }
+
+  test("Data partially available in BM, and the rest in HDFS") {
+testHDFSBackedRDD(3, 2, 20, 5)
+  }
+
+  /**
+   * Write a bunch of events into the HDFS Block RDD. Put a part of all of 
them to the
+   * BlockManager, so all reads need not happen from HDFS.
+   * @param total Total number of Strings to write
+   * @param blockCount Number of blocks to write (therefore, total # of 
events per block =
+   *   total/blockCount
+   */
+  private def testHDFSBackedRDD(
+  writeToBMCount: Int,
+  writeToHDFSCount: Int,
+  total: Int,
+  blockCount: Int
+) {
+val countPerBlock = total / blockCount
+val blockIds = (0 until blockCount).map { i =>
+StreamBlockId(idGenerator.incrementAndGet(), 
idGenerator.incrementAndGet())
+}
+
+val writtenStrings = generateData(total, countPerBlock)
+
+if (writeToBMCount != 0) {
+  (0 until writeToBMCount).foreach { i =>
+blockManager
+  .putIterator(blockIds(i), writtenStrings(i).iterator, 
StorageLevel.MEMORY_ONLY_SER)
+  }
+}
+
+val segments = {
+  if (writeToHDFSCount != 0) {
+// Generate some fake segments for the blocks in BM so the RDD 
does not complain
+generateFakeSegments(writeToBMCount) ++
+  writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount),
+blockIds.slice(writeToBMCount, blockCount))
+  } else {
+generateFakeSegments(blockCount)
+  }
+}
+
+val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, 

[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454541
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import java.io.File
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.collection.mutable.ArrayBuffer
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
+
+import com.google.common.io.Files
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, 
StreamBlockId}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
WriteAheadLogWriter}
+
+class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with 
BeforeAndAfterAll {
+  val conf = new SparkConf()
+.setMaster("local[2]")
+.setAppName(this.getClass.getSimpleName)
+  val hadoopConf = new Configuration()
+  // Since the same BM is reused in all tests, use an atomic int to 
generate ids
+  val idGenerator = new AtomicInteger(0)
+  
+  var sparkContext: SparkContext = null
+  var blockManager: BlockManager = null
+  var file: File = null
+  var dir: File = null
+
+  before {
+blockManager = sparkContext.env.blockManager
+dir = Files.createTempDir()
+file = new File(dir, "BlockManagerWrite")
+  }
+
+  after {
+file.delete()
+dir.delete()
+  }
+
+  override def beforeAll(): Unit = {
+sparkContext = new SparkContext(conf)
+  }
+
+  override def afterAll(): Unit = {
+// Copied from LocalSparkContext which can't be imported since 
spark-core test-jar does not
+// get imported properly by sbt even if it is created.
+sparkContext.stop()
+System.clearProperty("spark.driver.port")
+  }
+
+  test("Data available in BM and HDFS") {
+testHDFSBackedRDD(5, 5, 20, 5)
+  }
+
+  test("Data available in in BM but not in HDFS") {
+testHDFSBackedRDD(5, 0, 20, 5)
+  }
+
+  test("Data available in in HDFS and not in BM") {
+testHDFSBackedRDD(0, 5, 20, 5)
+  }
+
+  test("Data partially available in BM, and the rest in HDFS") {
+testHDFSBackedRDD(3, 2, 20, 5)
+  }
+
+  /**
+   * Write a bunch of events into the HDFS Block RDD. Put a part of all of 
them to the
+   * BlockManager, so all reads need not happen from HDFS.
+   * @param total Total number of Strings to write
+   * @param blockCount Number of blocks to write (therefore, total # of 
events per block =
+   *   total/blockCount
+   */
+  private def testHDFSBackedRDD(
+  writeToBMCount: Int,
+  writeToHDFSCount: Int,
+  total: Int,
+  blockCount: Int
+) {
+val countPerBlock = total / blockCount
+val blockIds = (0 until blockCount).map { i =>
+StreamBlockId(idGenerator.incrementAndGet(), 
idGenerator.incrementAndGet())
+}
+
+val writtenStrings = generateData(total, countPerBlock)
+
+if (writeToBMCount != 0) {
+  (0 until writeToBMCount).foreach { i =>
+blockManager
+  .putIterator(blockIds(i), writtenStrings(i).iterator, 
StorageLevel.MEMORY_ONLY_SER)
+  }
+}
+
+val segments = {
+  if (writeToHDFSCount != 0) {
+// Generate some fake segments for the blocks in BM so the RDD 
does not complain
+generateFakeSegments(writeToBMCount) ++
+  writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount),
+blockIds.slice(writeToBMCount, blockCount))
+  } else {
+generateFakeSegments(blockCount)
+  }
+}
+
+val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, 

[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454523
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import java.io.File
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.collection.mutable.ArrayBuffer
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
+
+import com.google.common.io.Files
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, 
StreamBlockId}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
WriteAheadLogWriter}
+
+class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with 
BeforeAndAfterAll {
+  val conf = new SparkConf()
+.setMaster("local[2]")
+.setAppName(this.getClass.getSimpleName)
+  val hadoopConf = new Configuration()
+  // Since the same BM is reused in all tests, use an atomic int to 
generate ids
+  val idGenerator = new AtomicInteger(0)
+  
+  var sparkContext: SparkContext = null
+  var blockManager: BlockManager = null
+  var file: File = null
+  var dir: File = null
+
+  before {
+blockManager = sparkContext.env.blockManager
+dir = Files.createTempDir()
+file = new File(dir, "BlockManagerWrite")
+  }
+
+  after {
+file.delete()
+dir.delete()
+  }
+
+  override def beforeAll(): Unit = {
+sparkContext = new SparkContext(conf)
+  }
+
+  override def afterAll(): Unit = {
+// Copied from LocalSparkContext which can't be imported since 
spark-core test-jar does not
+// get imported properly by sbt even if it is created.
+sparkContext.stop()
+System.clearProperty("spark.driver.port")
+  }
+
+  test("Data available in BM and HDFS") {
+testHDFSBackedRDD(5, 5, 20, 5)
+  }
+
+  test("Data available in in BM but not in HDFS") {
+testHDFSBackedRDD(5, 0, 20, 5)
+  }
+
+  test("Data available in in HDFS and not in BM") {
+testHDFSBackedRDD(0, 5, 20, 5)
+  }
+
+  test("Data partially available in BM, and the rest in HDFS") {
+testHDFSBackedRDD(3, 2, 20, 5)
+  }
+
+  /**
+   * Write a bunch of events into the HDFS Block RDD. Put a part of all of 
them to the
+   * BlockManager, so all reads need not happen from HDFS.
+   * @param total Total number of Strings to write
+   * @param blockCount Number of blocks to write (therefore, total # of 
events per block =
+   *   total/blockCount
+   */
+  private def testHDFSBackedRDD(
--- End diff --

you should explain more clearly what this test function does.  it is long 
enough that it is no longer obvious, and your comment doesn't really address 
that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454515
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import java.io.File
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.collection.mutable.ArrayBuffer
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
+
+import com.google.common.io.Files
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, 
StreamBlockId}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
WriteAheadLogWriter}
+
+class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with 
BeforeAndAfterAll {
+  val conf = new SparkConf()
+.setMaster("local[2]")
+.setAppName(this.getClass.getSimpleName)
+  val hadoopConf = new Configuration()
+  // Since the same BM is reused in all tests, use an atomic int to 
generate ids
+  val idGenerator = new AtomicInteger(0)
+  
+  var sparkContext: SparkContext = null
+  var blockManager: BlockManager = null
+  var file: File = null
+  var dir: File = null
+
+  before {
+blockManager = sparkContext.env.blockManager
+dir = Files.createTempDir()
+file = new File(dir, "BlockManagerWrite")
+  }
+
+  after {
+file.delete()
+dir.delete()
+  }
+
+  override def beforeAll(): Unit = {
+sparkContext = new SparkContext(conf)
+  }
+
+  override def afterAll(): Unit = {
+// Copied from LocalSparkContext which can't be imported since 
spark-core test-jar does not
+// get imported properly by sbt even if it is created.
+sparkContext.stop()
+System.clearProperty("spark.driver.port")
+  }
+
+  test("Data available in BM and HDFS") {
+testHDFSBackedRDD(5, 5, 20, 5)
+  }
+
+  test("Data available in in BM but not in HDFS") {
+testHDFSBackedRDD(5, 0, 20, 5)
+  }
+
+  test("Data available in in HDFS and not in BM") {
+testHDFSBackedRDD(0, 5, 20, 5)
+  }
+
+  test("Data partially available in BM, and the rest in HDFS") {
+testHDFSBackedRDD(3, 2, 20, 5)
+  }
+
+  /**
+   * Write a bunch of events into the HDFS Block RDD. Put a part of all of 
them to the
--- End diff --

what do u mean by "a part of all of them"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454502
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import java.io.File
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.collection.mutable.ArrayBuffer
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
+
+import com.google.common.io.Files
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, 
StreamBlockId}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
WriteAheadLogWriter}
+
+class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with 
BeforeAndAfterAll {
+  val conf = new SparkConf()
+.setMaster("local[2]")
+.setAppName(this.getClass.getSimpleName)
+  val hadoopConf = new Configuration()
+  // Since the same BM is reused in all tests, use an atomic int to 
generate ids
+  val idGenerator = new AtomicInteger(0)
+  
+  var sparkContext: SparkContext = null
+  var blockManager: BlockManager = null
+  var file: File = null
+  var dir: File = null
+
+  before {
+blockManager = sparkContext.env.blockManager
+dir = Files.createTempDir()
+file = new File(dir, "BlockManagerWrite")
+  }
+
+  after {
+file.delete()
+dir.delete()
+  }
+
+  override def beforeAll(): Unit = {
+sparkContext = new SparkContext(conf)
+  }
+
+  override def afterAll(): Unit = {
+// Copied from LocalSparkContext which can't be imported since 
spark-core test-jar does not
+// get imported properly by sbt even if it is created.
+sparkContext.stop()
+System.clearProperty("spark.driver.port")
+  }
+
+  test("Data available in BM and HDFS") {
+testHDFSBackedRDD(5, 5, 20, 5)
+  }
+
+  test("Data available in in BM but not in HDFS") {
+testHDFSBackedRDD(5, 0, 20, 5)
+  }
+
+  test("Data available in in HDFS and not in BM") {
+testHDFSBackedRDD(0, 5, 20, 5)
+  }
+
+  test("Data partially available in BM, and the rest in HDFS") {
+testHDFSBackedRDD(3, 2, 20, 5)
+  }
+
+  /**
+   * Write a bunch of events into the HDFS Block RDD. Put a part of all of 
them to the
+   * BlockManager, so all reads need not happen from HDFS.
+   * @param total Total number of Strings to write
+   * @param blockCount Number of blocks to write (therefore, total # of 
events per block =
+   *   total/blockCount
+   */
+  private def testHDFSBackedRDD(
+  writeToBMCount: Int,
+  writeToHDFSCount: Int,
+  total: Int,
+  blockCount: Int
+) {
--- End diff --

move this to the previous line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454508
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import java.io.File
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.collection.mutable.ArrayBuffer
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
+
+import com.google.common.io.Files
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, 
StreamBlockId}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
WriteAheadLogWriter}
+
+class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with 
BeforeAndAfterAll {
+  val conf = new SparkConf()
+.setMaster("local[2]")
+.setAppName(this.getClass.getSimpleName)
+  val hadoopConf = new Configuration()
+  // Since the same BM is reused in all tests, use an atomic int to 
generate ids
+  val idGenerator = new AtomicInteger(0)
+  
+  var sparkContext: SparkContext = null
+  var blockManager: BlockManager = null
+  var file: File = null
+  var dir: File = null
+
+  before {
+blockManager = sparkContext.env.blockManager
+dir = Files.createTempDir()
+file = new File(dir, "BlockManagerWrite")
+  }
+
+  after {
+file.delete()
+dir.delete()
+  }
+
+  override def beforeAll(): Unit = {
+sparkContext = new SparkContext(conf)
+  }
+
+  override def afterAll(): Unit = {
+// Copied from LocalSparkContext which can't be imported since 
spark-core test-jar does not
+// get imported properly by sbt even if it is created.
+sparkContext.stop()
+System.clearProperty("spark.driver.port")
+  }
+
+  test("Data available in BM and HDFS") {
+testHDFSBackedRDD(5, 5, 20, 5)
+  }
+
+  test("Data available in in BM but not in HDFS") {
+testHDFSBackedRDD(5, 0, 20, 5)
+  }
+
+  test("Data available in in HDFS and not in BM") {
+testHDFSBackedRDD(0, 5, 20, 5)
+  }
+
+  test("Data partially available in BM, and the rest in HDFS") {
+testHDFSBackedRDD(3, 2, 20, 5)
+  }
+
+  /**
+   * Write a bunch of events into the HDFS Block RDD. Put a part of all of 
them to the
+   * BlockManager, so all reads need not happen from HDFS.
+   * @param total Total number of Strings to write
+   * @param blockCount Number of blocks to write (therefore, total # of 
events per block =
+   *   total/blockCount
+   */
+  private def testHDFSBackedRDD(
+  writeToBMCount: Int,
+  writeToHDFSCount: Int,
+  total: Int,
+  blockCount: Int
+) {
+val countPerBlock = total / blockCount
+val blockIds = (0 until blockCount).map { i =>
--- End diff --

Use Array.tabulate instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2931#discussion_r19454510
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.rdd
+
+import java.io.File
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.collection.mutable.ArrayBuffer
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
+
+import com.google.common.io.Files
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, 
StreamBlockId}
+import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, 
WriteAheadLogWriter}
+
+class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with 
BeforeAndAfterAll {
+  val conf = new SparkConf()
+.setMaster("local[2]")
+.setAppName(this.getClass.getSimpleName)
+  val hadoopConf = new Configuration()
+  // Since the same BM is reused in all tests, use an atomic int to 
generate ids
+  val idGenerator = new AtomicInteger(0)
+  
+  var sparkContext: SparkContext = null
+  var blockManager: BlockManager = null
+  var file: File = null
+  var dir: File = null
+
+  before {
+blockManager = sparkContext.env.blockManager
+dir = Files.createTempDir()
+file = new File(dir, "BlockManagerWrite")
+  }
+
+  after {
+file.delete()
+dir.delete()
+  }
+
+  override def beforeAll(): Unit = {
+sparkContext = new SparkContext(conf)
+  }
+
+  override def afterAll(): Unit = {
+// Copied from LocalSparkContext which can't be imported since 
spark-core test-jar does not
+// get imported properly by sbt even if it is created.
+sparkContext.stop()
+System.clearProperty("spark.driver.port")
+  }
+
+  test("Data available in BM and HDFS") {
+testHDFSBackedRDD(5, 5, 20, 5)
+  }
+
+  test("Data available in in BM but not in HDFS") {
+testHDFSBackedRDD(5, 0, 20, 5)
+  }
+
+  test("Data available in in HDFS and not in BM") {
+testHDFSBackedRDD(0, 5, 20, 5)
+  }
+
+  test("Data partially available in BM, and the rest in HDFS") {
+testHDFSBackedRDD(3, 2, 20, 5)
+  }
+
+  /**
+   * Write a bunch of events into the HDFS Block RDD. Put a part of all of 
them to the
+   * BlockManager, so all reads need not happen from HDFS.
+   * @param total Total number of Strings to write
+   * @param blockCount Number of blocks to write (therefore, total # of 
events per block =
+   *   total/blockCount
+   */
+  private def testHDFSBackedRDD(
+  writeToBMCount: Int,
+  writeToHDFSCount: Int,
+  total: Int,
+  blockCount: Int
+) {
+val countPerBlock = total / blockCount
+val blockIds = (0 until blockCount).map { i =>
+StreamBlockId(idGenerator.incrementAndGet(), 
idGenerator.incrementAndGet())
--- End diff --

indent off


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4084] Reuse sort key in Sorter

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2937#discussion_r19454486
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -1237,12 +1237,27 @@ private[spark] object Utils extends Logging {
   /**
* Timing method based on iterations that permit JVM JIT optimization.
* @param numIters number of iterations
-   * @param f function to be executed
+   * @param f function to be executed. If prepare is not None, the running 
time of each call to f
+   *  must be an order of magnitude longer than one millisecond 
for accurate timing.
+   * @param prepare function to be executed before each call to f. Its 
running time doesn't count.
*/
-  def timeIt(numIters: Int)(f: => Unit): Long = {
-val start = System.currentTimeMillis
-times(numIters)(f)
-System.currentTimeMillis - start
+  def timeIt(numIters: Int)(f: => Unit, prepare: Option[() => Unit] = 
None): Long = {
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4084] Reuse sort key in Sorter

2014-10-27 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/2937#issuecomment-60713736
  
@aarondav Does `Jenkins` count?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4084] Reuse sort key in Sorter

2014-10-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2937#issuecomment-60713603
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22334/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4084] Reuse sort key in Sorter

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2937#issuecomment-60713601
  
  [Test build #22334 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22334/consoleFull)
 for   PR 2937 at commit 
[`0b7b682`](https://github.com/apache/spark/commit/0b7b682ea1fad80de08dbf1d63b56d3785f686ec).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2942#discussion_r19454435
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.clustering
+
+import breeze.linalg.{Vector => BV}
+
+import scala.reflect.ClassTag
+import scala.util.Random._
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.Logging
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.StreamingContext._
+
+/**
+ * :: DeveloperApi ::
+ *
+ * StreamingKMeansModel extends MLlib's KMeansModel for streaming
+ * algorithms, so it can keep track of the number of points assigned
+ * to each cluster, and also update the model by doing a single iteration
+ * of the standard KMeans algorithm.
+ *
+ * The update algorithm uses the "mini-batch" KMeans rule,
+ * generalized to incorporate forgetfullness (i.e. decay).
+ * The basic update rule (for each cluster) is:
+ *
+ * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
+ * n_t+t = n_t + m_t
+ *
+ * Where c_t is the previously estimated centroid for that cluster,
+ * n_t is the number of points assigned to it thus far, x_t is the centroid
+ * estimated on the current batch, and m_t is the number of points assigned
+ * to that centroid in the current batch.
+ *
+ * This update rule is modified with a decay factor 'a' that scales
+ * the contribution of the clusters as estimated thus far.
+ * If a=1, all batches are weighted equally. If a=0, new centroids
+ * are determined entirely by recent data. Lower values correspond to
+ * more forgetting.
+ *
+ * Decay can optionally be specified as a decay fraction 'q',
+ * which corresponds to the fraction of batches (or points)
+ * after which the past will be reduced to a contribution of 0.5.
+ * This decay fraction can be specified in units of 'points' or 'batches'.
+ * if 'batches', behavior will be independent of the number of points per 
batch;
+ * if 'points', the expected number of points per batch must be specified.
+ *
+ * Use a builder pattern to construct a streaming KMeans analysis
+ * in an application, like:
+ *
+ *  val model = new StreamingKMeans()
+ *.setDecayFactor(0.5)
+ *.setK(3)
+ *.setRandomCenters(5)
+ *.trainOn(DStream)
+ *
+ */
+@DeveloperApi
+class StreamingKMeansModel(
+override val clusterCenters: Array[Vector],
+val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) 
with Logging {
+
+  // do a sequential KMeans update on a batch of data
+  def update(data: RDD[Vector], a: Double, units: String): 
StreamingKMeansModel = {
+
+val centers = clusterCenters
+val counts = clusterCounts
+
+// find nearest cluster to each point
+val closest = data.map(point => (this.predict(point), (point.toBreeze, 
1.toLong)))
+
+// get sums and counts for updating each cluster
+type WeightedPoint = (BV[Double], Long)
+def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint 
= {
+  (p1._1 += p2._1, p1._2 + p2._2)
+}
+val pointStats: Array[(Int, (BV[Double], Long))] =
+  closest.reduceByKey{mergeContribs}.collectAsMap().toArray
+
+// implement update rule
+for (newP <- pointStats) {
+  // store old count and centroid
+  val oldCount = counts(newP._1)
+  val oldCentroid = centers(newP._1).toBreeze
+  // get new count and centroid
+  val newCount = newP._2._2
+  val newCentroid = newP._2._1 / newCount.toDouble
+  // compute the normalized scale factor that controls forgetting

[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2942#discussion_r19454416
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.clustering
+
+import breeze.linalg.{Vector => BV}
+
+import scala.reflect.ClassTag
+import scala.util.Random._
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.Logging
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.StreamingContext._
+
+/**
+ * :: DeveloperApi ::
+ *
+ * StreamingKMeansModel extends MLlib's KMeansModel for streaming
+ * algorithms, so it can keep track of the number of points assigned
+ * to each cluster, and also update the model by doing a single iteration
+ * of the standard KMeans algorithm.
+ *
+ * The update algorithm uses the "mini-batch" KMeans rule,
+ * generalized to incorporate forgetfullness (i.e. decay).
+ * The basic update rule (for each cluster) is:
+ *
+ * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
+ * n_t+t = n_t + m_t
+ *
+ * Where c_t is the previously estimated centroid for that cluster,
+ * n_t is the number of points assigned to it thus far, x_t is the centroid
+ * estimated on the current batch, and m_t is the number of points assigned
+ * to that centroid in the current batch.
+ *
+ * This update rule is modified with a decay factor 'a' that scales
+ * the contribution of the clusters as estimated thus far.
+ * If a=1, all batches are weighted equally. If a=0, new centroids
+ * are determined entirely by recent data. Lower values correspond to
+ * more forgetting.
+ *
+ * Decay can optionally be specified as a decay fraction 'q',
+ * which corresponds to the fraction of batches (or points)
+ * after which the past will be reduced to a contribution of 0.5.
+ * This decay fraction can be specified in units of 'points' or 'batches'.
+ * if 'batches', behavior will be independent of the number of points per 
batch;
+ * if 'points', the expected number of points per batch must be specified.
+ *
+ * Use a builder pattern to construct a streaming KMeans analysis
+ * in an application, like:
+ *
+ *  val model = new StreamingKMeans()
+ *.setDecayFactor(0.5)
+ *.setK(3)
+ *.setRandomCenters(5)
+ *.trainOn(DStream)
+ *
+ */
+@DeveloperApi
+class StreamingKMeansModel(
+override val clusterCenters: Array[Vector],
+val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) 
with Logging {
+
+  // do a sequential KMeans update on a batch of data
+  def update(data: RDD[Vector], a: Double, units: String): 
StreamingKMeansModel = {
+
+val centers = clusterCenters
+val counts = clusterCounts
+
+// find nearest cluster to each point
+val closest = data.map(point => (this.predict(point), (point.toBreeze, 
1.toLong)))
+
+// get sums and counts for updating each cluster
+type WeightedPoint = (BV[Double], Long)
+def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint 
= {
+  (p1._1 += p2._1, p1._2 + p2._2)
+}
+val pointStats: Array[(Int, (BV[Double], Long))] =
+  closest.reduceByKey{mergeContribs}.collectAsMap().toArray
+
+// implement update rule
+for (newP <- pointStats) {
+  // store old count and centroid
+  val oldCount = counts(newP._1)
+  val oldCentroid = centers(newP._1).toBreeze
+  // get new count and centroid
+  val newCount = newP._2._2
+  val newCentroid = newP._2._1 / newCount.toDouble
+  // compute the normalized scale factor that controls forgetting

[GitHub] spark pull request: SPARK-3968 Use parquet-mr filter2 api in spark...

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2841#issuecomment-60713375
  
  [Test build #22339 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22339/consoleFull)
 for   PR 2841 at commit 
[`515df1c`](https://github.com/apache/spark/commit/515df1cc70fec520828f45fb4b52a2643a6e7dcb).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3954][Streaming] promote the speed of c...

2014-10-27 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/2811#issuecomment-60713367
  
Maybe they are quite busy, let me ping @tdas .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4110] Wrong comments about default sett...

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2972#issuecomment-60713359
  
  [Test build #22338 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22338/consoleFull)
 for   PR 2972 at commit 
[`5a171a2`](https://github.com/apache/spark/commit/5a171a2ff89ee1bcfbf8458e523c9277a2fe041d).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

2014-10-27 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/2953#issuecomment-60713308
  
Can you write a short high level design doc for this change and attach it 
to JIRA?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...

2014-10-27 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2969#discussion_r19454387
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TachyonStore.scala 
---
@@ -105,25 +106,17 @@ private[spark] class TachyonStore(
   return None
 }
 val is = file.getInStream(ReadType.CACHE)
-var buffer: ByteBuffer = null
+assert (is != null)
 try {
-  if (is != null) {
-val size = file.length
-val bs = new Array[Byte](size.asInstanceOf[Int])
-val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
-buffer = ByteBuffer.wrap(bs)
-if (fetchSize != size) {
-  logWarning(s"Failed to fetch the block $blockId from Tachyon: 
Size $size " +
-s"is not equal to fetched size $fetchSize")
-  return None
-}
-  }
+  val size = file.length
+  val bs = new Array[Byte](size.asInstanceOf[Int])
+  ByteStreams.readFully(is, bs)
+  Some(ByteBuffer.wrap(bs))
 } catch {
--- End diff --

It's not generally safe to recover from OOM, so why would we want to catch 
it here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...

2014-10-27 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/2969#discussion_r19454354
  
--- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala 
---
@@ -106,7 +112,7 @@ final class FileSegmentManagedBuffer(val file: File, 
val offset: Long, val lengt
 var is: FileInputStream = null
 try {
   is = new FileInputStream(file)
-  is.skip(offset)
--- End diff --


http://docs.oracle.com/javase/7/docs/api/java/io/FileInputStream.html#skip(long):

> Skips over and discards n bytes of data from the input stream.
>
> The skip method may, for a variety of reasons, end up skipping over some 
smaller number of bytes, possibly 0. If n is negative, an IOException is 
thrown, even though the skip method of the InputStream superclass does nothing 
in this case. The actual number of bytes skipped is returned.
>
> This method may skip more bytes than are remaining in the backing file. 
This produces no exception and the number of bytes skipped may include some 
number of bytes that were beyond the EOF of the backing file. Attempting to 
read from the stream after skipping past the end will result in -1 indicating 
the end of the file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-3968 Use parquet-mr filter2 api in spark...

2014-10-27 Thread saucam
Github user saucam commented on the pull request:

https://github.com/apache/spark/pull/2841#issuecomment-60713180
  
Added a unit test for filter pushdown on optional column


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-27 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60713165
  
Here's one number. But this requires constant re-caching new node Id caches 
and unpersisting old node Id caches that is not reflected in the code yet. I'm 
not sure if frequent persisting of a new RDD from a previously persisted RDD is 
a cheap operation, but at least in this data set, it seems fast. Let me know if 
you guys know more about persistence mechanism.

mnist dataset, 750 columns with 6 rows (only two partitions). 8 
executors. 10-class classification. 100 trees trained, 30 max depth. Gini. With 
the default fraction testing.

Without node-id caching, it took 24 mins 34 seconds.
With node-id caching with persisting the cache every two iteration, it took 
16 minutes 42 seconds.

So we see noticeable benefits, as long as we frequently recache the node Id 
cache.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3954][Streaming] promote the speed of c...

2014-10-27 Thread surq
Github user surq commented on the pull request:

https://github.com/apache/spark/pull/2811#issuecomment-60713102
  
@jerryshao :Is this a inessential patch? why no manager to merge-commit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2969#discussion_r19454325
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TachyonStore.scala 
---
@@ -105,25 +106,17 @@ private[spark] class TachyonStore(
   return None
 }
 val is = file.getInStream(ReadType.CACHE)
-var buffer: ByteBuffer = null
+assert (is != null)
 try {
-  if (is != null) {
-val size = file.length
-val bs = new Array[Byte](size.asInstanceOf[Int])
-val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
-buffer = ByteBuffer.wrap(bs)
-if (fetchSize != size) {
-  logWarning(s"Failed to fetch the block $blockId from Tachyon: 
Size $size " +
-s"is not equal to fetched size $fetchSize")
-  return None
-}
-  }
+  val size = file.length
+  val bs = new Array[Byte](size.asInstanceOf[Int])
+  ByteStreams.readFully(is, bs)
+  Some(ByteBuffer.wrap(bs))
 } catch {
--- End diff --

u probably want to catch other exception / errors, e.g. in this case i'd 
also try to catch out of memory


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4110] Wrong comments about default sett...

2014-10-27 Thread sarutak
GitHub user sarutak opened a pull request:

https://github.com/apache/spark/pull/2972

[SPARK-4110] Wrong comments about default settings in spark-daemon.sh

In spark-daemon.sh, thare are following comments.


#   SPARK_CONF_DIR  Alternate conf dir. Default is ${SPARK_PREFIX}/conf.
#   SPARK_LOG_DIR   Where log files are stored.  PWD by default.

But, I think the default value for SPARK_CONF_DIR is `${SPARK_HOME}/conf` 
and for SPARK_LOG_DIR is `${SPARK_HOME}/logs`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sarutak/spark SPARK-4110

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2972.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2972


commit 5a171a2ff89ee1bcfbf8458e523c9277a2fe041d
Author: Kousuke Saruta 
Date:   2014-10-28T05:50:42Z

Fixed wrong comments




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4109][CORE] Correctly deserialize Task....

2014-10-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2971#issuecomment-60712916
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4109] Correctly deserialize Task.stageI...

2014-10-27 Thread luluorta
GitHub user luluorta opened a pull request:

https://github.com/apache/spark/pull/2971

[SPARK-4109] Correctly deserialize Task.stageId

The two subclasses of Task, ShuffleMapTask and ResultTask, do not correctly 
deserialize stageId. Therefore, the accessing of TaskContext.stageId always 
returns zero value to the user.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/luluorta/spark fix-task-ser

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2971.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2971


commit ff35ee6522743989fb85ae5382f28e040af9d8a1
Author: luluorta 
Date:   2014-10-28T05:45:08Z

correctly deserialize Task.stageId




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...

2014-10-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2969#discussion_r19454231
  
--- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala 
---
@@ -106,7 +112,7 @@ final class FileSegmentManagedBuffer(val file: File, 
val offset: Long, val lengt
 var is: FileInputStream = null
 try {
   is = new FileInputStream(file)
-  is.skip(offset)
--- End diff --

what is the problem with the old code here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19454193
  
--- Diff: python/pyspark/mllib/feature.py ---
@@ -95,33 +385,26 @@ class Word2Vec(object):
 >>> localDoc = [sentence, sentence]
 >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" "))
 >>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc)
+
 >>> syms = model.findSynonyms("a", 2)
->>> str(syms[0][0])
-'b'
->>> str(syms[1][0])
-'c'
->>> len(syms)
-2
+>>> [s[0] for s in syms]
+[u'b', u'c']
 >>> vec = model.transform("a")
->>> len(vec)
-10
 >>> syms = model.findSynonyms(vec, 2)
->>> str(syms[0][0])
-'b'
->>> str(syms[1][0])
-'c'
->>> len(syms)
-2
+>>> [s[0] for s in syms]
+[u'b', u'c']
 """
 def __init__(self):
 """
 Construct Word2Vec instance
 """
+import random  # this can't be on the top because of mllib.random
+
 self.vectorSize = 100
 self.learningRate = 0.025
 self.numPartitions = 1
 self.numIterations = 1
-self.seed = 42L
+self.seed = random.randint(0, sys.maxint)
--- End diff --

`sys.maxint` -> `2 ** 32 - 1` (see #2889)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19454180
  
--- Diff: python/pyspark/mllib/feature.py ---
@@ -18,59 +18,348 @@
 """
 Python package for feature in MLlib.
 """
+import sys
+import warnings
+
+import py4j.protocol
+from py4j.protocol import Py4JJavaError
+from py4j.java_gateway import JavaObject
+
+from pyspark import RDD, SparkContext
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
+from pyspark.mllib.linalg import Vectors, _to_java_object_rdd
+
+__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
+   'HashTF', 'IDFModel', 'IDF',
+   'Word2Vec', 'Word2VecModel']
+
+
+# Hack for support float('inf') in Py4j
+old_smart_decode = py4j.protocol.smart_decode
--- End diff --

Shall we use underscore for those private vars and functions: 
`old_smart_decode`, `float_str_mapping`, `new_smart_decode`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19454191
  
--- Diff: python/pyspark/mllib/feature.py ---
@@ -18,59 +18,348 @@
 """
 Python package for feature in MLlib.
 """
+import sys
+import warnings
+
+import py4j.protocol
+from py4j.protocol import Py4JJavaError
+from py4j.java_gateway import JavaObject
+
+from pyspark import RDD, SparkContext
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
+from pyspark.mllib.linalg import Vectors, _to_java_object_rdd
+
+__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
+   'HashTF', 'IDFModel', 'IDF',
+   'Word2Vec', 'Word2VecModel']
+
+
+# Hack for support float('inf') in Py4j
+old_smart_decode = py4j.protocol.smart_decode
+
+float_str_mapping = {
+u'nan': u'NaN',
+u'inf': u'Infinity',
+u'-inf': u'-Infinity',
+}
+
+
+def new_smart_decode(obj):
+if isinstance(obj, float):
+s = unicode(obj)
+return float_str_mapping.get(s, s)
+return old_smart_decode(obj)
+
+py4j.protocol.smart_decode = new_smart_decode
+
+
+# TODO: move these helper functions into utils
+_picklable_classes = [
+'LinkedList',
+'SparseVector',
+'DenseVector',
+'DenseMatrix',
+'Rating',
+'LabeledPoint',
+]
+
+
+def _py2java(sc, a):
+""" Convert Python object into Java """
+if isinstance(a, RDD):
+a = _to_java_object_rdd(a)
+elif not isinstance(a, (int, long, float, bool, basestring)):
+bytes = bytearray(PickleSerializer().dumps(a))
+a = sc._jvm.SerDe.loads(bytes)
+return a
+
+
+def _java2py(sc, r):
+if isinstance(r, JavaObject):
+clsName = r.getClass().getSimpleName()
+if clsName in ("RDD", "JavaRDD"):
+if clsName == "RDD":
+r = r.toJavaRDD()
+jrdd = sc._jvm.SerDe.javaToPython(r)
+return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
 
-__all__ = ['Word2Vec', 'Word2VecModel']
+elif clsName in _picklable_classes:
+r = sc._jvm.SerDe.dumps(r)
 
+if isinstance(r, bytearray):
+r = PickleSerializer().loads(str(r))
+return r
 
-class Word2VecModel(object):
+
+def _callJavaFunc(sc, func, *args):
+""" Call Java Function
 """
-class for Word2Vec model
+args = [_py2java(sc, a) for a in args]
+return _java2py(sc, func(*args))
+
+
+def _callAPI(sc, name, *args):
+""" Call API in PythonMLLibAPI
 """
-def __init__(self, sc, java_model):
+api = getattr(sc._jvm.PythonMLLibAPI(), name)
+return _callJavaFunc(sc, api, *args)
+
+
+class VectorTransformer(object):
+"""
+:: DeveloperApi ::
+Base class for transformation of a vector or RDD of vector
+"""
+def transform(self, vector):
 """
-:param sc:  Spark context
-:param java_model:  Handle to Java model object
+Applies transformation on a vector.
+
+:param vector: vector to be transformed.
 """
+raise NotImplementedError
+
+
+class Normalizer(VectorTransformer):
+"""
+:: Experimental ::
+Normalizes samples individually to unit L^p^ norm
+
+For any 1 <= p <= float('inf'), normalizes samples using
+sum(abs(vector).^p^)^(1/p)^ as norm.
+
+For p = float('inf'), max(abs(vector)) will be used as norm for 
normalization.
+
+>>> v = Vectors.dense(range(3))
+>>> nor = Normalizer(1)
+>>> nor.transform(v)
+DenseVector([0.0, 0., 0.6667])
+
+>>> rdd = sc.parallelize([v])
+>>> nor.transform(rdd).collect()
+[DenseVector([0.0, 0., 0.6667])]
+
+>>> nor2 = Normalizer(float("inf"))
+>>> nor2.transform(v)
+DenseVector([0.0, 0.5, 1.0])
+"""
+def __init__(self, p=2):
+"""
+:param p: Normalization in L^p^ space, p = 2 by default.
+"""
+assert p >= 1.0, "p should be greater than 1.0"
+self.p = float(p)
+
+def transform(self, vector):
+"""
+Applies unit length normalization on a vector.
+
+:param vector: vector to be normalized.
+:return: normalized vector. If the norm of the input is zero, it
+will return the input vector.
+"""
+sc = SparkContext._active_spark_context
+assert s

[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19454184
  
--- Diff: python/pyspark/mllib/feature.py ---
@@ -18,59 +18,348 @@
 """
 Python package for feature in MLlib.
 """
+import sys
+import warnings
+
+import py4j.protocol
+from py4j.protocol import Py4JJavaError
+from py4j.java_gateway import JavaObject
+
+from pyspark import RDD, SparkContext
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
+from pyspark.mllib.linalg import Vectors, _to_java_object_rdd
+
+__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
+   'HashTF', 'IDFModel', 'IDF',
+   'Word2Vec', 'Word2VecModel']
+
+
+# Hack for support float('inf') in Py4j
+old_smart_decode = py4j.protocol.smart_decode
+
+float_str_mapping = {
+u'nan': u'NaN',
+u'inf': u'Infinity',
+u'-inf': u'-Infinity',
+}
+
+
+def new_smart_decode(obj):
+if isinstance(obj, float):
+s = unicode(obj)
+return float_str_mapping.get(s, s)
+return old_smart_decode(obj)
+
+py4j.protocol.smart_decode = new_smart_decode
+
+
+# TODO: move these helper functions into utils
+_picklable_classes = [
+'LinkedList',
+'SparseVector',
+'DenseVector',
+'DenseMatrix',
+'Rating',
+'LabeledPoint',
+]
+
+
+def _py2java(sc, a):
+""" Convert Python object into Java """
+if isinstance(a, RDD):
+a = _to_java_object_rdd(a)
+elif not isinstance(a, (int, long, float, bool, basestring)):
+bytes = bytearray(PickleSerializer().dumps(a))
+a = sc._jvm.SerDe.loads(bytes)
+return a
+
+
+def _java2py(sc, r):
+if isinstance(r, JavaObject):
+clsName = r.getClass().getSimpleName()
+if clsName in ("RDD", "JavaRDD"):
+if clsName == "RDD":
+r = r.toJavaRDD()
+jrdd = sc._jvm.SerDe.javaToPython(r)
+return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
 
-__all__ = ['Word2Vec', 'Word2VecModel']
+elif clsName in _picklable_classes:
+r = sc._jvm.SerDe.dumps(r)
 
+if isinstance(r, bytearray):
+r = PickleSerializer().loads(str(r))
+return r
 
-class Word2VecModel(object):
+
+def _callJavaFunc(sc, func, *args):
+""" Call Java Function
 """
-class for Word2Vec model
+args = [_py2java(sc, a) for a in args]
+return _java2py(sc, func(*args))
+
+
+def _callAPI(sc, name, *args):
+""" Call API in PythonMLLibAPI
 """
-def __init__(self, sc, java_model):
+api = getattr(sc._jvm.PythonMLLibAPI(), name)
+return _callJavaFunc(sc, api, *args)
+
+
+class VectorTransformer(object):
+"""
+:: DeveloperApi ::
+Base class for transformation of a vector or RDD of vector
+"""
+def transform(self, vector):
 """
-:param sc:  Spark context
-:param java_model:  Handle to Java model object
+Applies transformation on a vector.
+
+:param vector: vector to be transformed.
 """
+raise NotImplementedError
+
+
+class Normalizer(VectorTransformer):
+"""
+:: Experimental ::
+Normalizes samples individually to unit L^p^ norm
+
+For any 1 <= p <= float('inf'), normalizes samples using
+sum(abs(vector).^p^)^(1/p)^ as norm.
+
+For p = float('inf'), max(abs(vector)) will be used as norm for 
normalization.
+
+>>> v = Vectors.dense(range(3))
+>>> nor = Normalizer(1)
+>>> nor.transform(v)
+DenseVector([0.0, 0., 0.6667])
+
+>>> rdd = sc.parallelize([v])
+>>> nor.transform(rdd).collect()
+[DenseVector([0.0, 0., 0.6667])]
+
+>>> nor2 = Normalizer(float("inf"))
+>>> nor2.transform(v)
+DenseVector([0.0, 0.5, 1.0])
+"""
+def __init__(self, p=2):
--- End diff --

`2` -> `2.0`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19454187
  
--- Diff: python/pyspark/mllib/feature.py ---
@@ -18,59 +18,348 @@
 """
 Python package for feature in MLlib.
 """
+import sys
+import warnings
+
+import py4j.protocol
+from py4j.protocol import Py4JJavaError
+from py4j.java_gateway import JavaObject
+
+from pyspark import RDD, SparkContext
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
+from pyspark.mllib.linalg import Vectors, _to_java_object_rdd
+
+__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
+   'HashTF', 'IDFModel', 'IDF',
+   'Word2Vec', 'Word2VecModel']
+
+
+# Hack for support float('inf') in Py4j
+old_smart_decode = py4j.protocol.smart_decode
+
+float_str_mapping = {
+u'nan': u'NaN',
+u'inf': u'Infinity',
+u'-inf': u'-Infinity',
+}
+
+
+def new_smart_decode(obj):
+if isinstance(obj, float):
+s = unicode(obj)
+return float_str_mapping.get(s, s)
+return old_smart_decode(obj)
+
+py4j.protocol.smart_decode = new_smart_decode
+
+
+# TODO: move these helper functions into utils
+_picklable_classes = [
+'LinkedList',
+'SparseVector',
+'DenseVector',
+'DenseMatrix',
+'Rating',
+'LabeledPoint',
+]
+
+
+def _py2java(sc, a):
+""" Convert Python object into Java """
+if isinstance(a, RDD):
+a = _to_java_object_rdd(a)
+elif not isinstance(a, (int, long, float, bool, basestring)):
+bytes = bytearray(PickleSerializer().dumps(a))
+a = sc._jvm.SerDe.loads(bytes)
+return a
+
+
+def _java2py(sc, r):
+if isinstance(r, JavaObject):
+clsName = r.getClass().getSimpleName()
+if clsName in ("RDD", "JavaRDD"):
+if clsName == "RDD":
+r = r.toJavaRDD()
+jrdd = sc._jvm.SerDe.javaToPython(r)
+return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
 
-__all__ = ['Word2Vec', 'Word2VecModel']
+elif clsName in _picklable_classes:
+r = sc._jvm.SerDe.dumps(r)
 
+if isinstance(r, bytearray):
+r = PickleSerializer().loads(str(r))
+return r
 
-class Word2VecModel(object):
+
+def _callJavaFunc(sc, func, *args):
+""" Call Java Function
 """
-class for Word2Vec model
+args = [_py2java(sc, a) for a in args]
+return _java2py(sc, func(*args))
+
+
+def _callAPI(sc, name, *args):
+""" Call API in PythonMLLibAPI
 """
-def __init__(self, sc, java_model):
+api = getattr(sc._jvm.PythonMLLibAPI(), name)
+return _callJavaFunc(sc, api, *args)
+
+
+class VectorTransformer(object):
+"""
+:: DeveloperApi ::
+Base class for transformation of a vector or RDD of vector
+"""
+def transform(self, vector):
 """
-:param sc:  Spark context
-:param java_model:  Handle to Java model object
+Applies transformation on a vector.
+
+:param vector: vector to be transformed.
 """
+raise NotImplementedError
+
+
+class Normalizer(VectorTransformer):
+"""
+:: Experimental ::
+Normalizes samples individually to unit L^p^ norm
+
+For any 1 <= p <= float('inf'), normalizes samples using
+sum(abs(vector).^p^)^(1/p)^ as norm.
+
+For p = float('inf'), max(abs(vector)) will be used as norm for 
normalization.
+
+>>> v = Vectors.dense(range(3))
+>>> nor = Normalizer(1)
+>>> nor.transform(v)
+DenseVector([0.0, 0., 0.6667])
+
+>>> rdd = sc.parallelize([v])
+>>> nor.transform(rdd).collect()
+[DenseVector([0.0, 0., 0.6667])]
+
+>>> nor2 = Normalizer(float("inf"))
+>>> nor2.transform(v)
+DenseVector([0.0, 0.5, 1.0])
+"""
+def __init__(self, p=2):
+"""
+:param p: Normalization in L^p^ space, p = 2 by default.
+"""
+assert p >= 1.0, "p should be greater than 1.0"
+self.p = float(p)
+
+def transform(self, vector):
+"""
+Applies unit length normalization on a vector.
+
+:param vector: vector to be normalized.
+:return: normalized vector. If the norm of the input is zero, it
+will return the input vector.
+"""
+sc = SparkContext._active_spark_context
+assert s

[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19454177
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala ---
@@ -20,6 +20,7 @@ package org.apache.spark.mllib.feature
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.rdd.RDD
+import org.apache.spark.api.java.JavaRDD
--- End diff --

organize imports in alphabetical order


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19454181
  
--- Diff: python/pyspark/mllib/feature.py ---
@@ -18,59 +18,348 @@
 """
 Python package for feature in MLlib.
 """
+import sys
+import warnings
+
+import py4j.protocol
+from py4j.protocol import Py4JJavaError
+from py4j.java_gateway import JavaObject
+
+from pyspark import RDD, SparkContext
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
+from pyspark.mllib.linalg import Vectors, _to_java_object_rdd
+
+__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
+   'HashTF', 'IDFModel', 'IDF',
+   'Word2Vec', 'Word2VecModel']
+
+
+# Hack for support float('inf') in Py4j
+old_smart_decode = py4j.protocol.smart_decode
+
+float_str_mapping = {
+u'nan': u'NaN',
+u'inf': u'Infinity',
+u'-inf': u'-Infinity',
+}
+
+
+def new_smart_decode(obj):
+if isinstance(obj, float):
+s = unicode(obj)
+return float_str_mapping.get(s, s)
+return old_smart_decode(obj)
+
+py4j.protocol.smart_decode = new_smart_decode
+
+
+# TODO: move these helper functions into utils
+_picklable_classes = [
+'LinkedList',
+'SparseVector',
+'DenseVector',
+'DenseMatrix',
+'Rating',
+'LabeledPoint',
+]
+
+
+def _py2java(sc, a):
+""" Convert Python object into Java """
+if isinstance(a, RDD):
+a = _to_java_object_rdd(a)
+elif not isinstance(a, (int, long, float, bool, basestring)):
+bytes = bytearray(PickleSerializer().dumps(a))
+a = sc._jvm.SerDe.loads(bytes)
+return a
+
+
+def _java2py(sc, r):
+if isinstance(r, JavaObject):
+clsName = r.getClass().getSimpleName()
+if clsName in ("RDD", "JavaRDD"):
+if clsName == "RDD":
+r = r.toJavaRDD()
+jrdd = sc._jvm.SerDe.javaToPython(r)
+return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
 
-__all__ = ['Word2Vec', 'Word2VecModel']
+elif clsName in _picklable_classes:
+r = sc._jvm.SerDe.dumps(r)
 
+if isinstance(r, bytearray):
+r = PickleSerializer().loads(str(r))
+return r
 
-class Word2VecModel(object):
+
+def _callJavaFunc(sc, func, *args):
+""" Call Java Function
 """
-class for Word2Vec model
+args = [_py2java(sc, a) for a in args]
+return _java2py(sc, func(*args))
+
+
+def _callAPI(sc, name, *args):
+""" Call API in PythonMLLibAPI
 """
-def __init__(self, sc, java_model):
+api = getattr(sc._jvm.PythonMLLibAPI(), name)
+return _callJavaFunc(sc, api, *args)
+
+
+class VectorTransformer(object):
+"""
+:: DeveloperApi ::
+Base class for transformation of a vector or RDD of vector
+"""
+def transform(self, vector):
 """
-:param sc:  Spark context
-:param java_model:  Handle to Java model object
+Applies transformation on a vector.
+
+:param vector: vector to be transformed.
 """
+raise NotImplementedError
+
+
+class Normalizer(VectorTransformer):
+"""
+:: Experimental ::
+Normalizes samples individually to unit L^p^ norm
--- End diff --

`L^p^` doesn't show up correctly in the generated doc. This is `L` with 
subscript `p`, so with Sphinx it should be

~~~
L\ :sub:`p`\ norm
~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19454172
  
--- Diff: docs/mllib-feature-extraction.md ---
@@ -267,4 +346,25 @@ val data1 = data.map(x => (x.label, 
normalizer1.transform(x.features)))
 val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
 {% endhighlight %}
 
+
+
+{% highlight python %}
+from pyspark.mllib.util import MLUtils
+from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.feature import Normalizer
+
+data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
+label = data.map(lambda x: x.label)
--- End diff --

`label` -> `labels`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19454179
  
--- Diff: python/pyspark/mllib/feature.py ---
@@ -18,59 +18,348 @@
 """
 Python package for feature in MLlib.
 """
+import sys
+import warnings
+
+import py4j.protocol
+from py4j.protocol import Py4JJavaError
+from py4j.java_gateway import JavaObject
+
+from pyspark import RDD, SparkContext
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
+from pyspark.mllib.linalg import Vectors, _to_java_object_rdd
+
+__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
+   'HashTF', 'IDFModel', 'IDF',
--- End diff --

`HashTF` -> `HashingTF`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19454164
  
--- Diff: docs/mllib-feature-extraction.md ---
@@ -162,6 +204,20 @@ for((synonym, cosineSimilarity) <- synonyms) {
 }
 {% endhighlight %}
 
+
--- End diff --

Shall we skip the python examples for `Word2Vec` in this PR? #2952 added 
example code for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19454166
  
--- Diff: docs/mllib-feature-extraction.md ---
@@ -267,4 +346,25 @@ val data1 = data.map(x => (x.label, 
normalizer1.transform(x.features)))
 val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
 {% endhighlight %}
 
+
+
+{% highlight python %}
+from pyspark.mllib.util import MLUtils
+from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.feature import Normalizer
+
+data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
+label = data.map(lambda x: x.label)
--- End diff --

`label` is not used


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...

2014-10-27 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/2819#discussion_r19454162
  
--- Diff: docs/mllib-feature-extraction.md ---
@@ -95,8 +95,50 @@ tf.cache()
 val idf = new IDF(minDocFreq = 2).fit(tf)
 val tfidf: RDD[Vector] = idf.transform(tf)
 {% endhighlight %}
+
+
+
+TF and IDF are implemented in 
[HashingTF](api/python/pyspark.mllib.html#pyspark.mllib.feature.HashingTF)
+and [IDF](api/python/pyspark.mllib.html#pyspark.mllib.feature.IDF).
+`HashingTF` takes an RDD of list as the input.
+Each record could be an iterable of strings or other types.
+
+{% highlight python %}
+from pyspark import SparkContext
+from pyspark.mllib.linalg import Vector
--- End diff --

`Vector` is not used


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

2014-10-27 Thread codedeft
Github user codedeft commented on the pull request:

https://github.com/apache/spark/pull/2868#issuecomment-60712109
  
Currently doing some performance testing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3343] [SQL] Add serde support for CTAS

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2570#issuecomment-60711633
  
  [Test build #22337 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22337/consoleFull)
 for   PR 2570 at commit 
[`e011ef5`](https://github.com/apache/spark/commit/e011ef557be3f438c5052e65462d5fbb89b51b6d).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4084] Reuse sort key in Sorter

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2937#issuecomment-60711412
  
  [Test build #22336 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22336/consoleFull)
 for   PR 2937 at commit 
[`c63927f`](https://github.com/apache/spark/commit/c63927f26473a3dbe664bc3621aeea12109f7afb).
 * This patch **fails Python style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4084] Reuse sort key in Sorter

2014-10-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2937#issuecomment-60711414
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22336/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4084] Reuse sort key in Sorter

2014-10-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2937#issuecomment-60711341
  
  [Test build #22336 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22336/consoleFull)
 for   PR 2937 at commit 
[`c63927f`](https://github.com/apache/spark/commit/c63927f26473a3dbe664bc3621aeea12109f7afb).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   >