[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/2871#issuecomment-60716720 LGTM, too. Very clever testing strategy! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4108][sql]
Github user liancheng commented on the pull request: https://github.com/apache/spark/pull/2970#issuecomment-60716619 LGTM, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4108][sql]
Github user liancheng commented on the pull request: https://github.com/apache/spark/pull/2970#issuecomment-60716625 Test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: fix broken links in README.md
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/2859 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3904] [SQL] add constant objectinspecto...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2762#issuecomment-60716603 [Test build #22345 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22345/consoleFull) for PR 2762 at commit [`782722f`](https://github.com/apache/spark/commit/782722fc78e00f44cb1c23645cd57b819edceac9). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/2871#discussion_r19455404 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -157,14 +161,12 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) out.defaultWriteObject() } - /** Used by the JVM when deserializing this object. */ - private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { -in.defaultReadObject() + private def readBroadcastBlock(): T = Utils.tryOrIOException { --- End diff -- I don't think that `Utils.tryOrIOException` is _strictly_ necessary here, since this method will no longer be called from `readObject()`, so we won't have to worry about any exceptions that it throws being swallowed inside of a Java deserializer. I added the `tryOrIOException` blocks as part of a general campaign to combat error-reporting issues in our custom Externalizable / Serializable classes. It's fine to use this here, though, if you want to restrict the range of possible exceptions that can be thrown when calling `getValue()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3904] [SQL] add constant objectinspecto...
Github user liancheng commented on the pull request: https://github.com/apache/spark/pull/2762#issuecomment-60716472 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: fix broken links in README.md
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2859#issuecomment-60716464 Merging this. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [ SPARK-1812] Adjust build system and tests to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2615#issuecomment-60716442 [Test build #22343 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22343/consoleFull) for PR 2615 at commit [`22858c8`](https://github.com/apache/spark/commit/22858c8cac6aa94f250c851f24c357ff853faf23). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark 3922] Refactor spark-core to use Utils....
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2781#issuecomment-60716440 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2969#discussion_r19455324 --- Diff: core/src/main/scala/org/apache/spark/storage/TachyonStore.scala --- @@ -105,25 +106,17 @@ private[spark] class TachyonStore( return None } val is = file.getInStream(ReadType.CACHE) -var buffer: ByteBuffer = null +assert (is != null) try { - if (is != null) { -val size = file.length -val bs = new Array[Byte](size.asInstanceOf[Int]) -val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) -buffer = ByteBuffer.wrap(bs) -if (fetchSize != size) { - logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " + -s"is not equal to fetched size $fetchSize") - return None -} - } + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + ByteStreams.readFully(is, bs) + Some(ByteBuffer.wrap(bs)) } catch { --- End diff -- ok you convinced me on this one. are there any other type of exceptions (not errors) that can be thrown? we shouldn't throw a random exception when the implicit contract of the api is already None - if it fails? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2871#discussion_r19455293 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -173,15 +175,21 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) val time = (System.nanoTime() - start) / 1e9 logInfo("Reading broadcast variable " + id + " took " + time + " s") --- End diff -- actually if u need to change josh's other comment, can u fix this one as well? :) thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/2871#discussion_r19455263 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -173,15 +175,21 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) val time = (System.nanoTime() - start) / 1e9 logInfo("Reading broadcast variable " + id + " took " + time + " s") - _value = -TorrentBroadcast.unBlockifyObject[T](blocks, SparkEnv.get.serializer, compressionCodec) + val obj = TorrentBroadcast.unBlockifyObject[T]( +blocks, SparkEnv.get.serializer, compressionCodec) // Store the merged copy in BlockManager so other tasks on this executor don't // need to re-fetch it. SparkEnv.get.blockManager.putSingle( -broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = false) +broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false) + obj } } } + + /** Used by the JVM when deserializing this object. */ + private def readObject(in: ObjectInputStream) { --- End diff -- Do we still need this method if it's just going to call the default readObject? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/2871#discussion_r19455214 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -173,15 +175,21 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) val time = (System.nanoTime() - start) / 1e9 logInfo("Reading broadcast variable " + id + " took " + time + " s") --- End diff -- +1. I've seen several cases where this type of human-friendly formatting has resulted in log messages that report things like "spilling 0 MB to disk" or "task took 0 seconds", which isn't helpful while debugging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [ SPARK-1812] Adjust build system and tests to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2615#issuecomment-60715842 [Test build #22342 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22342/consoleFull) for PR 2615 at commit [`e5786c8`](https://github.com/apache/spark/commit/e5786c8948971bd408bb6e3ebb05f72e9b75b915). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19455162 --- Diff: python/pyspark/mllib/feature.py --- @@ -18,59 +18,348 @@ """ Python package for feature in MLlib. """ +import sys +import warnings + +import py4j.protocol +from py4j.protocol import Py4JJavaError +from py4j.java_gateway import JavaObject + +from pyspark import RDD, SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer -from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd +from pyspark.mllib.linalg import Vectors, _to_java_object_rdd + +__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler', + 'HashTF', 'IDFModel', 'IDF', + 'Word2Vec', 'Word2VecModel'] + + +# Hack for support float('inf') in Py4j +old_smart_decode = py4j.protocol.smart_decode + +float_str_mapping = { +u'nan': u'NaN', +u'inf': u'Infinity', +u'-inf': u'-Infinity', +} + + +def new_smart_decode(obj): +if isinstance(obj, float): +s = unicode(obj) +return float_str_mapping.get(s, s) +return old_smart_decode(obj) + +py4j.protocol.smart_decode = new_smart_decode + + +# TODO: move these helper functions into utils +_picklable_classes = [ +'LinkedList', +'SparseVector', +'DenseVector', +'DenseMatrix', +'Rating', +'LabeledPoint', +] + + +def _py2java(sc, a): +""" Convert Python object into Java """ +if isinstance(a, RDD): +a = _to_java_object_rdd(a) +elif not isinstance(a, (int, long, float, bool, basestring)): +bytes = bytearray(PickleSerializer().dumps(a)) +a = sc._jvm.SerDe.loads(bytes) +return a + + +def _java2py(sc, r): +if isinstance(r, JavaObject): +clsName = r.getClass().getSimpleName() +if clsName in ("RDD", "JavaRDD"): +if clsName == "RDD": +r = r.toJavaRDD() +jrdd = sc._jvm.SerDe.javaToPython(r) +return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer())) -__all__ = ['Word2Vec', 'Word2VecModel'] +elif clsName in _picklable_classes: +r = sc._jvm.SerDe.dumps(r) +if isinstance(r, bytearray): +r = PickleSerializer().loads(str(r)) +return r -class Word2VecModel(object): + +def _callJavaFunc(sc, func, *args): +""" Call Java Function """ -class for Word2Vec model +args = [_py2java(sc, a) for a in args] +return _java2py(sc, func(*args)) + + +def _callAPI(sc, name, *args): +""" Call API in PythonMLLibAPI """ -def __init__(self, sc, java_model): +api = getattr(sc._jvm.PythonMLLibAPI(), name) +return _callJavaFunc(sc, api, *args) + + +class VectorTransformer(object): +""" +:: DeveloperApi :: +Base class for transformation of a vector or RDD of vector +""" +def transform(self, vector): """ -:param sc: Spark context -:param java_model: Handle to Java model object +Applies transformation on a vector. + +:param vector: vector to be transformed. """ +raise NotImplementedError + + +class Normalizer(VectorTransformer): +""" +:: Experimental :: +Normalizes samples individually to unit L^p^ norm + +For any 1 <= p <= float('inf'), normalizes samples using +sum(abs(vector).^p^)^(1/p)^ as norm. + +For p = float('inf'), max(abs(vector)) will be used as norm for normalization. + +>>> v = Vectors.dense(range(3)) +>>> nor = Normalizer(1) +>>> nor.transform(v) +DenseVector([0.0, 0., 0.6667]) + +>>> rdd = sc.parallelize([v]) +>>> nor.transform(rdd).collect() +[DenseVector([0.0, 0., 0.6667])] + +>>> nor2 = Normalizer(float("inf")) +>>> nor2.transform(v) +DenseVector([0.0, 0.5, 1.0]) +""" +def __init__(self, p=2): +""" +:param p: Normalization in L^p^ space, p = 2 by default. +""" +assert p >= 1.0, "p should be greater than 1.0" +self.p = float(p) + +def transform(self, vector): +""" +Applies unit length normalization on a vector. + +:param vector: vector to be normalized. +:return: normalized vector. If the norm of the input is zero, it +will return the input vector. +""" +sc = SparkContext._active_spark_context +assert s
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19455165 --- Diff: python/pyspark/mllib/feature.py --- @@ -95,33 +385,26 @@ class Word2Vec(object): >>> localDoc = [sentence, sentence] >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" ")) >>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc) + >>> syms = model.findSynonyms("a", 2) ->>> str(syms[0][0]) -'b' ->>> str(syms[1][0]) -'c' ->>> len(syms) -2 +>>> [s[0] for s in syms] +[u'b', u'c'] >>> vec = model.transform("a") ->>> len(vec) -10 >>> syms = model.findSynonyms(vec, 2) ->>> str(syms[0][0]) -'b' ->>> str(syms[1][0]) -'c' ->>> len(syms) -2 +>>> [s[0] for s in syms] +[u'b', u'c'] """ def __init__(self): """ Construct Word2Vec instance """ +import random # this can't be on the top because of mllib.random + self.vectorSize = 100 self.learningRate = 0.025 self.numPartitions = 1 self.numIterations = 1 -self.seed = 42L +self.seed = random.randint(0, sys.maxint) --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [ SPARK-1812] Adjust build system and tests to...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2615#issuecomment-60715844 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22342/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/2969#discussion_r19455153 --- Diff: core/src/main/scala/org/apache/spark/storage/TachyonStore.scala --- @@ -105,25 +106,17 @@ private[spark] class TachyonStore( return None } val is = file.getInStream(ReadType.CACHE) -var buffer: ByteBuffer = null +assert (is != null) try { - if (is != null) { -val size = file.length -val bs = new Array[Byte](size.asInstanceOf[Int]) -val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) -buffer = ByteBuffer.wrap(bs) -if (fetchSize != size) { - logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " + -s"is not equal to fetched size $fetchSize") - return None -} - } + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + ByteStreams.readFully(is, bs) + Some(ByteBuffer.wrap(bs)) } catch { --- End diff -- Ah, gotcha. From a general API design perspective, I think it's a little weird to have methods that swallow OOMs and resurface them as other errors. As a library consumer, how would you feel if, say, Snappy were to swallow OOMs and re-throw them as IOException? A library with that sort of unexpected behavior might actually break user applications' ability to recover from OOMs: if we ran out of memory due to excessive memory usage in some other part of the app but the OOM happened to be caught and swallowed by the library, then the top-level uncaught exception handler might never get a chance to respond to the OOM in an application-specific way (e.g. attempt to close files, trigger GCs, or clear caches). I'm not against handling OOMs in principle, but I think that it should probably be done at higher levels of the stack since that seems easier to reason about. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4097] Fix the race condition of 'thread...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2957#discussion_r19455126 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -210,7 +210,9 @@ class ComplexFutureAction[T] extends FutureAction[T] { } catch { case e: Exception => p.failure(e) } finally { -thread = null +ComplexFutureAction.this.synchronized { --- End diff -- what effect does this synchronized do? it seems to me it is actually not protecting anything --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19455090 --- Diff: python/pyspark/mllib/feature.py --- @@ -18,59 +18,348 @@ """ Python package for feature in MLlib. """ +import sys +import warnings + +import py4j.protocol +from py4j.protocol import Py4JJavaError +from py4j.java_gateway import JavaObject + +from pyspark import RDD, SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer -from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd +from pyspark.mllib.linalg import Vectors, _to_java_object_rdd + +__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler', + 'HashTF', 'IDFModel', 'IDF', + 'Word2Vec', 'Word2VecModel'] + + +# Hack for support float('inf') in Py4j +old_smart_decode = py4j.protocol.smart_decode + +float_str_mapping = { +u'nan': u'NaN', +u'inf': u'Infinity', +u'-inf': u'-Infinity', +} + + +def new_smart_decode(obj): +if isinstance(obj, float): +s = unicode(obj) +return float_str_mapping.get(s, s) +return old_smart_decode(obj) + +py4j.protocol.smart_decode = new_smart_decode + + +# TODO: move these helper functions into utils +_picklable_classes = [ +'LinkedList', +'SparseVector', +'DenseVector', +'DenseMatrix', +'Rating', +'LabeledPoint', +] + + +def _py2java(sc, a): +""" Convert Python object into Java """ +if isinstance(a, RDD): +a = _to_java_object_rdd(a) +elif not isinstance(a, (int, long, float, bool, basestring)): +bytes = bytearray(PickleSerializer().dumps(a)) +a = sc._jvm.SerDe.loads(bytes) +return a + + +def _java2py(sc, r): +if isinstance(r, JavaObject): +clsName = r.getClass().getSimpleName() +if clsName in ("RDD", "JavaRDD"): +if clsName == "RDD": +r = r.toJavaRDD() +jrdd = sc._jvm.SerDe.javaToPython(r) +return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer())) -__all__ = ['Word2Vec', 'Word2VecModel'] +elif clsName in _picklable_classes: +r = sc._jvm.SerDe.dumps(r) +if isinstance(r, bytearray): +r = PickleSerializer().loads(str(r)) +return r -class Word2VecModel(object): + +def _callJavaFunc(sc, func, *args): +""" Call Java Function """ -class for Word2Vec model +args = [_py2java(sc, a) for a in args] +return _java2py(sc, func(*args)) + + +def _callAPI(sc, name, *args): +""" Call API in PythonMLLibAPI """ -def __init__(self, sc, java_model): +api = getattr(sc._jvm.PythonMLLibAPI(), name) +return _callJavaFunc(sc, api, *args) + + +class VectorTransformer(object): +""" +:: DeveloperApi :: +Base class for transformation of a vector or RDD of vector +""" +def transform(self, vector): """ -:param sc: Spark context -:param java_model: Handle to Java model object +Applies transformation on a vector. + +:param vector: vector to be transformed. """ +raise NotImplementedError + + +class Normalizer(VectorTransformer): +""" +:: Experimental :: +Normalizes samples individually to unit L^p^ norm + +For any 1 <= p <= float('inf'), normalizes samples using +sum(abs(vector).^p^)^(1/p)^ as norm. + +For p = float('inf'), max(abs(vector)) will be used as norm for normalization. + +>>> v = Vectors.dense(range(3)) +>>> nor = Normalizer(1) +>>> nor.transform(v) +DenseVector([0.0, 0., 0.6667]) + +>>> rdd = sc.parallelize([v]) +>>> nor.transform(rdd).collect() +[DenseVector([0.0, 0., 0.6667])] + +>>> nor2 = Normalizer(float("inf")) +>>> nor2.transform(v) +DenseVector([0.0, 0.5, 1.0]) +""" +def __init__(self, p=2): --- End diff -- It will be converted into float, but having "2.0" here will be better for docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail
[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2871#issuecomment-60715459 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19455072 --- Diff: python/pyspark/mllib/feature.py --- @@ -18,59 +18,348 @@ """ Python package for feature in MLlib. """ +import sys +import warnings + +import py4j.protocol +from py4j.protocol import Py4JJavaError +from py4j.java_gateway import JavaObject + +from pyspark import RDD, SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer -from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd +from pyspark.mllib.linalg import Vectors, _to_java_object_rdd + +__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler', + 'HashTF', 'IDFModel', 'IDF', + 'Word2Vec', 'Word2VecModel'] + + +# Hack for support float('inf') in Py4j +old_smart_decode = py4j.protocol.smart_decode + +float_str_mapping = { +u'nan': u'NaN', +u'inf': u'Infinity', +u'-inf': u'-Infinity', +} + + +def new_smart_decode(obj): +if isinstance(obj, float): +s = unicode(obj) +return float_str_mapping.get(s, s) +return old_smart_decode(obj) + +py4j.protocol.smart_decode = new_smart_decode + + +# TODO: move these helper functions into utils +_picklable_classes = [ +'LinkedList', +'SparseVector', +'DenseVector', +'DenseMatrix', +'Rating', +'LabeledPoint', +] + + +def _py2java(sc, a): +""" Convert Python object into Java """ +if isinstance(a, RDD): +a = _to_java_object_rdd(a) +elif not isinstance(a, (int, long, float, bool, basestring)): +bytes = bytearray(PickleSerializer().dumps(a)) +a = sc._jvm.SerDe.loads(bytes) +return a + + +def _java2py(sc, r): +if isinstance(r, JavaObject): +clsName = r.getClass().getSimpleName() +if clsName in ("RDD", "JavaRDD"): +if clsName == "RDD": +r = r.toJavaRDD() +jrdd = sc._jvm.SerDe.javaToPython(r) +return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer())) -__all__ = ['Word2Vec', 'Word2VecModel'] +elif clsName in _picklable_classes: +r = sc._jvm.SerDe.dumps(r) +if isinstance(r, bytearray): +r = PickleSerializer().loads(str(r)) +return r -class Word2VecModel(object): + +def _callJavaFunc(sc, func, *args): +""" Call Java Function """ -class for Word2Vec model +args = [_py2java(sc, a) for a in args] +return _java2py(sc, func(*args)) + + +def _callAPI(sc, name, *args): +""" Call API in PythonMLLibAPI """ -def __init__(self, sc, java_model): +api = getattr(sc._jvm.PythonMLLibAPI(), name) +return _callJavaFunc(sc, api, *args) + + +class VectorTransformer(object): +""" +:: DeveloperApi :: +Base class for transformation of a vector or RDD of vector +""" +def transform(self, vector): """ -:param sc: Spark context -:param java_model: Handle to Java model object +Applies transformation on a vector. + +:param vector: vector to be transformed. """ +raise NotImplementedError + + +class Normalizer(VectorTransformer): +""" +:: Experimental :: +Normalizes samples individually to unit L^p^ norm + +For any 1 <= p <= float('inf'), normalizes samples using +sum(abs(vector).^p^)^(1/p)^ as norm. + +For p = float('inf'), max(abs(vector)) will be used as norm for normalization. + +>>> v = Vectors.dense(range(3)) +>>> nor = Normalizer(1) +>>> nor.transform(v) +DenseVector([0.0, 0., 0.6667]) + +>>> rdd = sc.parallelize([v]) +>>> nor.transform(rdd).collect() +[DenseVector([0.0, 0., 0.6667])] + +>>> nor2 = Normalizer(float("inf")) +>>> nor2.transform(v) +DenseVector([0.0, 0.5, 1.0]) +""" +def __init__(self, p=2): +""" +:param p: Normalization in L^p^ space, p = 2 by default. +""" +assert p >= 1.0, "p should be greater than 1.0" +self.p = float(p) + +def transform(self, vector): +""" +Applies unit length normalization on a vector. + +:param vector: vector to be normalized. +:return: normalized vector. If the norm of the input is zero, it +will return the input vector. +""" +sc = SparkContext._active_spark_context +assert s
[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2871#discussion_r19455067 --- Diff: core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala --- @@ -21,11 +21,28 @@ import scala.util.Random import org.scalatest.{Assertions, FunSuite} -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkEnv} import org.apache.spark.io.SnappyCompressionCodec +import org.apache.spark.rdd.RDD import org.apache.spark.serializer.JavaSerializer import org.apache.spark.storage._ +// Dummy class that creates a broadcast variable but doesn't use it +class DummyBroadcastClass(rdd: RDD[Int]) extends Serializable { + @transient val list = List(1, 2, 3, 4) + val broadcast = rdd.context.broadcast(list) + val bid = broadcast.id + + def doSomething() = { +rdd.map { x => --- End diff -- this is pretty neat --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2871#discussion_r19455054 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -173,15 +175,21 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) val time = (System.nanoTime() - start) / 1e9 logInfo("Reading broadcast variable " + id + " took " + time + " s") --- End diff -- this is not your change, but we should log ms instead of s here. i will fix it later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2969#discussion_r19455025 --- Diff: core/src/main/scala/org/apache/spark/storage/TachyonStore.scala --- @@ -105,25 +106,17 @@ private[spark] class TachyonStore( return None } val is = file.getInStream(ReadType.CACHE) -var buffer: ByteBuffer = null +assert (is != null) try { - if (is != null) { -val size = file.length -val bs = new Array[Byte](size.asInstanceOf[Int]) -val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) -buffer = ByteBuffer.wrap(bs) -if (fetchSize != size) { - logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " + -s"is not equal to fetched size $fetchSize") - return None -} - } + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + ByteStreams.readFully(is, bs) + Some(ByteBuffer.wrap(bs)) } catch { --- End diff -- I was not suggesting your update was inconsistent with the old implementation. I merely pointed out this is a good place to check for more type of errors because the caller is usually not setup to deal with OOM, but it is probably setup to deal with None. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4064]NioBlockTransferService.fetchBlock...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/2929 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3795] Heuristics for dynamically scalin...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/2746#issuecomment-60715271 @andrewor14 this is looking good. All my comments are about some refactoring to make it more testable, and dealing with error conditions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19454983 --- Diff: python/pyspark/mllib/feature.py --- @@ -95,33 +385,26 @@ class Word2Vec(object): >>> localDoc = [sentence, sentence] >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" ")) >>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc) + >>> syms = model.findSynonyms("a", 2) ->>> str(syms[0][0]) -'b' ->>> str(syms[1][0]) -'c' ->>> len(syms) -2 +>>> [s[0] for s in syms] +[u'b', u'c'] >>> vec = model.transform("a") ->>> len(vec) -10 >>> syms = model.findSynonyms(vec, 2) ->>> str(syms[0][0]) -'b' ->>> str(syms[1][0]) -'c' ->>> len(syms) -2 +>>> [s[0] for s in syms] +[u'b', u'c'] """ def __init__(self): """ Construct Word2Vec instance """ +import random # this can't be on the top because of mllib.random + self.vectorSize = 100 self.learningRate = 0.025 self.numPartitions = 1 self.numIterations = 1 -self.seed = 42L +self.seed = random.randint(0, sys.maxint) --- End diff -- Here has nothing with numpy, so it will have problems. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3904] [SQL] add constant objectinspecto...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/2762#issuecomment-60715086 test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4064]NioBlockTransferService.fetchBlock...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2929#issuecomment-60715040 Thanks. I'm merging this one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4064]NioBlockTransferService.fetchBlock...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2929#issuecomment-60715056 @aarondav this will conflict with your pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [ SPARK-1812] Adjust build system and tests to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2615#issuecomment-60714945 [Test build #22342 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22342/consoleFull) for PR 2615 at commit [`e5786c8`](https://github.com/apache/spark/commit/e5786c8948971bd408bb6e3ebb05f72e9b75b915). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4133] [SQL] [PySpark] type conversionfo...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2973#issuecomment-60714816 [Test build #22341 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22341/consoleFull) for PR 2973 at commit [`35caa4f`](https://github.com/apache/spark/commit/35caa4fe7d87af752fb2df5bbd3e01e69784920d). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4133] [SQL] [PySpark] type conversionfo...
GitHub user davies opened a pull request: https://github.com/apache/spark/pull/2973 [SPARK-4133] [SQL] [PySpark] type conversionfor python udf Call Python UDF on ArrayType/MapType/PrimitiveType, the returnType can also be ArrayType/MapType/PrimitiveType. For StructType, it will act as tuple (without attributes). If returnType is StructType, it also should be tuple. You can merge this pull request into a Git repository by running: $ git pull https://github.com/davies/spark udf_array Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2973.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2973 commit 35caa4fe7d87af752fb2df5bbd3e01e69784920d Author: Davies Liu Date: 2014-10-28T06:18:10Z type conversionfor python udf --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/2969#discussion_r19454860 --- Diff: core/src/main/scala/org/apache/spark/storage/TachyonStore.scala --- @@ -105,25 +106,17 @@ private[spark] class TachyonStore( return None } val is = file.getInStream(ReadType.CACHE) -var buffer: ByteBuffer = null +assert (is != null) try { - if (is != null) { -val size = file.length -val bs = new Array[Byte](size.asInstanceOf[Int]) -val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) -buffer = ByteBuffer.wrap(bs) -if (fetchSize != size) { - logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " + -s"is not equal to fetched size $fetchSize") - return None -} - } + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + ByteStreams.readFully(is, bs) + Some(ByteBuffer.wrap(bs)) } catch { --- End diff -- I think that catching OutOfMemoryError is inconsistent with the rest of the code base. The only places where we catch it are for the purposes of logging more information about why an executor died. I don't think that any of the changes that I've made here are inconsistent with the old implementation: if we fail to fetch, we still return None, since ByteStreams.readFully throws IOException when it encounters errors. Besides, couldn't the OOM have occurred due to some other thread starving this one of memory? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3343] [SQL] Add serde support for CTAS
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2570#issuecomment-60714742 [Test build #22337 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22337/consoleFull) for PR 2570 at commit [`e011ef5`](https://github.com/apache/spark/commit/e011ef557be3f438c5052e65462d5fbb89b51b6d). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class CreateTableAsSelect[T](` * ` logDebug("Found class for $serdeName")` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454844 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.rdd.BlockRDD +import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader} +import org.apache.spark._ --- End diff -- import out of order --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3343] [SQL] Add serde support for CTAS
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2570#issuecomment-60714744 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22337/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3795] Heuristics for dynamically scalin...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2746#discussion_r19454814 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.mutable + +import org.apache.spark.scheduler._ + +/** + * An agent that dynamically allocates and removes executors based on the workload. + * + * The add policy depends on whether there are backlogged tasks waiting to be scheduled. If + * the scheduler queue is not drained in N seconds, then new executors are added. If the queue + * persists for another M seconds, then more executors are added and so on. The number added + * in each round increases exponentially from the previous round until an upper bound on the + * number of executors has been reached. + * + * The rationale for the exponential increase is twofold: (1) Executors should be added slowly + * in the beginning in case the number of extra executors needed turns out to be small. Otherwise, + * we may add more executors than we need just to remove them later. (2) Executors should be added + * quickly over time in case the maximum number of executors is very high. Otherwise, it will take + * a long time to ramp up under heavy workloads. + * + * The remove policy is simpler: If an executor has been idle for K seconds, meaning it has not + * been scheduled to run any tasks, then it is removed. + * + * There is no retry logic in either case because we make the assumption that the cluster manager + * will eventually fulfill all requests it receives asynchronously. + * + * The relevant Spark properties include the following: + * + * spark.dynamicAllocation.enabled - Whether this feature is enabled + * spark.dynamicAllocation.minExecutors - Lower bound on the number of executors + * spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors + * + * spark.dynamicAllocation.schedulerBacklogTimeout (M) - + * If there are backlogged tasks for this duration, add new executors + * + * spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) - + * If the backlog is sustained for this duration, add more executors + * This is used only after the initial backlog timeout is exceeded + * + * spark.dynamicAllocation.executorIdleTimeout (K) - + * If an executor has been idle for this duration, remove it + */ +private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging { + import ExecutorAllocationManager._ + + private val conf = sc.conf + + // Lower and upper bounds on the number of executors. These are required. + private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1) + private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1) + if (minNumExecutors < 0 || maxNumExecutors < 0) { +throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!") + } + if (minNumExecutors > maxNumExecutors) { +throw new SparkException("spark.dynamicAllocation.minExecutors must " + + "be less than or equal to spark.dynamicAllocation.maxExecutors!") + } + + // How long there must be backlogged tasks for before an addition is triggered + private val schedulerBacklogTimeout = conf.getLong( +"spark.dynamicAllocation.schedulerBacklogTimeout", 60) + + // Same as above, but used only after `schedulerBacklogTimeout` is exceeded + private val sustainedSchedulerBacklogTimeout = conf.getLong( +"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout) + + // How long an executor must be idle for before it is removed + private val removeThresholdSeconds = conf.getLong( +"spark.dynamicAllocation.executorIdleTimeout", 600) + +
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454823 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.rdd.BlockRDD +import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader} +import org.apache.spark._ + +private[streaming] +class HDFSBackedBlockRDDPartition( +val blockId: BlockId, +val index: Int, +val segment: WriteAheadLogFileSegment + ) extends Partition + +private[streaming] +class HDFSBackedBlockRDD[T: ClassTag]( --- End diff -- It would be great to add javadoc explaining what this class is for. If it is used for recovery, why should we put the blocks in block manager after using them? Shouldn't recovery data be used only once during a recovery? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454808 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.rdd.BlockRDD +import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader} +import org.apache.spark._ + +private[streaming] +class HDFSBackedBlockRDDPartition( +val blockId: BlockId, +val index: Int, +val segment: WriteAheadLogFileSegment + ) extends Partition + +private[streaming] +class HDFSBackedBlockRDD[T: ClassTag]( --- End diff -- I don't think it makes sense to tie this to WriteAheadLogFileSegment. On one hand the naming HDFSBackedBlockRDD is supposed to be general, on the other you tie it to recovery through the use of WriteAheadLogFileSegment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [ SPARK-1812] Adjust build system and tests to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2615#issuecomment-60714610 [Test build #22340 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22340/consoleFull) for PR 2615 at commit [`06a82b9`](https://github.com/apache/spark/commit/06a82b961f5ba08669b28810aa4dcae001428d8c). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2871#issuecomment-60714605 [Test build #22335 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22335/consoleFull) for PR 2871 at commit [`d6c5ee9`](https://github.com/apache/spark/commit/d6c5ee9629e4bc894913a4d181ca8ed963472e49). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4031] Make torrent broadcast read block...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2871#issuecomment-60714608 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22335/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [ SPARK-1812] Adjust build system and tests to...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2615#issuecomment-60714611 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22340/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2969#discussion_r19454743 --- Diff: core/src/main/scala/org/apache/spark/storage/TachyonStore.scala --- @@ -105,25 +106,17 @@ private[spark] class TachyonStore( return None } val is = file.getInStream(ReadType.CACHE) -var buffer: ByteBuffer = null +assert (is != null) try { - if (is != null) { -val size = file.length -val bs = new Array[Byte](size.asInstanceOf[Int]) -val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) -buffer = ByteBuffer.wrap(bs) -if (fetchSize != size) { - logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " + -s"is not equal to fetched size $fetchSize") - return None -} - } + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + ByteStreams.readFully(is, bs) + Some(ByteBuffer.wrap(bs)) } catch { --- End diff -- because we are supposed to return None if this fails ... it is pretty easy to recover from OOM here, if the OOM is due to the creation of the array. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [ SPARK-1812] Adjust build system and tests to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2615#issuecomment-60714433 [Test build #22340 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22340/consoleFull) for PR 2615 at commit [`06a82b9`](https://github.com/apache/spark/commit/06a82b961f5ba08669b28810aa4dcae001428d8c). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3954][Streaming] promote the speed of c...
Github user surq commented on the pull request: https://github.com/apache/spark/pull/2811#issuecomment-60714301 @tdas Since this patch was proposed also some day.If you have time, please pay more attention to this patch. @jerryshao Thanks for your kind help. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454664 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.rdd.BlockRDD +import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader} +import org.apache.spark._ + +private[streaming] +class HDFSBackedBlockRDDPartition( +val blockId: BlockId, +val index: Int, +val segment: WriteAheadLogFileSegment + ) extends Partition + +private[streaming] +class HDFSBackedBlockRDD[T: ClassTag]( +@transient sc: SparkContext, +@transient hadoopConfiguration: Configuration, +@transient override val blockIds: Array[BlockId], +@transient val segments: Array[WriteAheadLogFileSegment], +val storeInBlockManager: Boolean, +val storageLevel: StorageLevel + ) extends BlockRDD[T](sc, blockIds) { + + require(blockIds.length == segments.length, +"Number of block ids must be the same as number of segments!") + + // Hadoop Configuration is not serializable, so broadcast it as a serializable. + val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration)) + + override def getPartitions: Array[Partition] = { +assertValid() +(0 until blockIds.size).map { i => --- End diff -- Array.tabulate --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454662 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.rdd.BlockRDD +import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader} +import org.apache.spark._ + +private[streaming] +class HDFSBackedBlockRDDPartition( +val blockId: BlockId, +val index: Int, +val segment: WriteAheadLogFileSegment + ) extends Partition + +private[streaming] +class HDFSBackedBlockRDD[T: ClassTag]( +@transient sc: SparkContext, +@transient hadoopConfiguration: Configuration, +@transient override val blockIds: Array[BlockId], +@transient val segments: Array[WriteAheadLogFileSegment], +val storeInBlockManager: Boolean, +val storageLevel: StorageLevel + ) extends BlockRDD[T](sc, blockIds) { + + require(blockIds.length == segments.length, +"Number of block ids must be the same as number of segments!") + + // Hadoop Configuration is not serializable, so broadcast it as a serializable. + val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration)) --- End diff -- can this be private[this]? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454635 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.rdd.BlockRDD +import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader} +import org.apache.spark._ + +private[streaming] +class HDFSBackedBlockRDDPartition( +val blockId: BlockId, +val index: Int, +val segment: WriteAheadLogFileSegment + ) extends Partition + +private[streaming] +class HDFSBackedBlockRDD[T: ClassTag]( +@transient sc: SparkContext, +@transient hadoopConfiguration: Configuration, +@transient override val blockIds: Array[BlockId], --- End diff -- i don't think u need to declare this a field. it's already a field in the parent class and you can just use that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3822] Executor scaling mechanism for Ya...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2840#discussion_r19454618 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1501,8 +1533,13 @@ object SparkContext extends Logging { res } - /** Creates a task scheduler based on a given master URL. Extracted for testing. */ - private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = { + /** + * Create a task scheduler based on a given master URL. + * Return a 2-tuple of the scheduler backend and the task scheduler. + */ + private def createTaskScheduler( + sc: SparkContext, + master: String): (SchedulerBackend, TaskScheduler) = { --- End diff -- Okay - sounds good --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454599 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.rdd.BlockRDD +import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader} +import org.apache.spark._ + +private[streaming] +class HDFSBackedBlockRDDPartition( +val blockId: BlockId, +val index: Int, +val segment: WriteAheadLogFileSegment + ) extends Partition + +private[streaming] +class HDFSBackedBlockRDD[T: ClassTag]( +@transient sc: SparkContext, +@transient hadoopConfiguration: Configuration, +@transient override val blockIds: Array[BlockId], +@transient val segments: Array[WriteAheadLogFileSegment], +val storeInBlockManager: Boolean, +val storageLevel: StorageLevel + ) extends BlockRDD[T](sc, blockIds) { + + require(blockIds.length == segments.length, +"Number of block ids must be the same as number of segments!") + + // Hadoop Configuration is not serializable, so broadcast it as a serializable. + val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration)) + + override def getPartitions: Array[Partition] = { +assertValid() +(0 until blockIds.size).map { i => + new HDFSBackedBlockRDDPartition(blockIds(i), i, segments(i)) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[T] = { +assertValid() +val hadoopConf = broadcastedHadoopConf.value.value +val blockManager = SparkEnv.get.blockManager +val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] +val blockId = partition.blockId +blockManager.get(blockId) match { + // Data is in Block Manager, grab it from there. + case Some(block) => +block.data.asInstanceOf[Iterator[T]] + // Data not found in Block Manager, grab it from HDFS + case None => +logInfo("Reading partition data from write ahead log " + partition.segment.path) +val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) +val dataRead = reader.read(partition.segment) +reader.close() +// Currently, we support storing the data to BM only in serialized form and not in +// deserialized form +if (storeInBlockManager) { + blockManager.putBytes(blockId, dataRead, storageLevel) +} +dataRead.rewind() +blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] +} + } + + override def getPreferredLocations(split: Partition): Seq[String] = { +val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] +val locations = getBlockIdLocations() +locations.getOrElse(partition.blockId, --- End diff -- these few lines of getOrElse's are way too complicated to use getOrElse. For the outer most layer, just create an if/else to make it more clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454583 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import java.io.File --- End diff -- imports out of order --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454568 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import java.io.File +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.{SparkConf, SparkContext} + +import scala.collection.mutable.ArrayBuffer +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} + +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} + +class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { + val conf = new SparkConf() +.setMaster("local[2]") +.setAppName(this.getClass.getSimpleName) + val hadoopConf = new Configuration() + // Since the same BM is reused in all tests, use an atomic int to generate ids + val idGenerator = new AtomicInteger(0) + + var sparkContext: SparkContext = null + var blockManager: BlockManager = null + var file: File = null + var dir: File = null + + before { +blockManager = sparkContext.env.blockManager +dir = Files.createTempDir() +file = new File(dir, "BlockManagerWrite") + } + + after { +file.delete() +dir.delete() + } + + override def beforeAll(): Unit = { +sparkContext = new SparkContext(conf) + } + + override def afterAll(): Unit = { +// Copied from LocalSparkContext which can't be imported since spark-core test-jar does not +// get imported properly by sbt even if it is created. +sparkContext.stop() +System.clearProperty("spark.driver.port") + } + + test("Data available in BM and HDFS") { +testHDFSBackedRDD(5, 5, 20, 5) + } + + test("Data available in in BM but not in HDFS") { +testHDFSBackedRDD(5, 0, 20, 5) + } + + test("Data available in in HDFS and not in BM") { +testHDFSBackedRDD(0, 5, 20, 5) + } + + test("Data partially available in BM, and the rest in HDFS") { +testHDFSBackedRDD(3, 2, 20, 5) + } + + /** + * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the + * BlockManager, so all reads need not happen from HDFS. + * @param total Total number of Strings to write + * @param blockCount Number of blocks to write (therefore, total # of events per block = + * total/blockCount + */ + private def testHDFSBackedRDD( --- End diff -- basically i'm worried the test case is too complicated for others to understand without enough comment, and it will be modified incorrectly in the future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454543 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import java.io.File +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.{SparkConf, SparkContext} + +import scala.collection.mutable.ArrayBuffer +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} + +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} + +class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { + val conf = new SparkConf() +.setMaster("local[2]") +.setAppName(this.getClass.getSimpleName) + val hadoopConf = new Configuration() + // Since the same BM is reused in all tests, use an atomic int to generate ids + val idGenerator = new AtomicInteger(0) + + var sparkContext: SparkContext = null + var blockManager: BlockManager = null + var file: File = null + var dir: File = null + + before { +blockManager = sparkContext.env.blockManager +dir = Files.createTempDir() +file = new File(dir, "BlockManagerWrite") + } + + after { +file.delete() +dir.delete() + } + + override def beforeAll(): Unit = { +sparkContext = new SparkContext(conf) + } + + override def afterAll(): Unit = { +// Copied from LocalSparkContext which can't be imported since spark-core test-jar does not +// get imported properly by sbt even if it is created. +sparkContext.stop() +System.clearProperty("spark.driver.port") + } + + test("Data available in BM and HDFS") { +testHDFSBackedRDD(5, 5, 20, 5) + } + + test("Data available in in BM but not in HDFS") { +testHDFSBackedRDD(5, 0, 20, 5) + } + + test("Data available in in HDFS and not in BM") { +testHDFSBackedRDD(0, 5, 20, 5) + } + + test("Data partially available in BM, and the rest in HDFS") { +testHDFSBackedRDD(3, 2, 20, 5) + } + + /** + * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the + * BlockManager, so all reads need not happen from HDFS. + * @param total Total number of Strings to write + * @param blockCount Number of blocks to write (therefore, total # of events per block = + * total/blockCount + */ + private def testHDFSBackedRDD( + writeToBMCount: Int, + writeToHDFSCount: Int, + total: Int, + blockCount: Int +) { +val countPerBlock = total / blockCount +val blockIds = (0 until blockCount).map { i => +StreamBlockId(idGenerator.incrementAndGet(), idGenerator.incrementAndGet()) +} + +val writtenStrings = generateData(total, countPerBlock) + +if (writeToBMCount != 0) { + (0 until writeToBMCount).foreach { i => +blockManager + .putIterator(blockIds(i), writtenStrings(i).iterator, StorageLevel.MEMORY_ONLY_SER) + } +} + +val segments = { + if (writeToHDFSCount != 0) { +// Generate some fake segments for the blocks in BM so the RDD does not complain +generateFakeSegments(writeToBMCount) ++ + writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount), +blockIds.slice(writeToBMCount, blockCount)) + } else { +generateFakeSegments(blockCount) + } +} + +val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf,
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454541 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import java.io.File +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.{SparkConf, SparkContext} + +import scala.collection.mutable.ArrayBuffer +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} + +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} + +class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { + val conf = new SparkConf() +.setMaster("local[2]") +.setAppName(this.getClass.getSimpleName) + val hadoopConf = new Configuration() + // Since the same BM is reused in all tests, use an atomic int to generate ids + val idGenerator = new AtomicInteger(0) + + var sparkContext: SparkContext = null + var blockManager: BlockManager = null + var file: File = null + var dir: File = null + + before { +blockManager = sparkContext.env.blockManager +dir = Files.createTempDir() +file = new File(dir, "BlockManagerWrite") + } + + after { +file.delete() +dir.delete() + } + + override def beforeAll(): Unit = { +sparkContext = new SparkContext(conf) + } + + override def afterAll(): Unit = { +// Copied from LocalSparkContext which can't be imported since spark-core test-jar does not +// get imported properly by sbt even if it is created. +sparkContext.stop() +System.clearProperty("spark.driver.port") + } + + test("Data available in BM and HDFS") { +testHDFSBackedRDD(5, 5, 20, 5) + } + + test("Data available in in BM but not in HDFS") { +testHDFSBackedRDD(5, 0, 20, 5) + } + + test("Data available in in HDFS and not in BM") { +testHDFSBackedRDD(0, 5, 20, 5) + } + + test("Data partially available in BM, and the rest in HDFS") { +testHDFSBackedRDD(3, 2, 20, 5) + } + + /** + * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the + * BlockManager, so all reads need not happen from HDFS. + * @param total Total number of Strings to write + * @param blockCount Number of blocks to write (therefore, total # of events per block = + * total/blockCount + */ + private def testHDFSBackedRDD( + writeToBMCount: Int, + writeToHDFSCount: Int, + total: Int, + blockCount: Int +) { +val countPerBlock = total / blockCount +val blockIds = (0 until blockCount).map { i => +StreamBlockId(idGenerator.incrementAndGet(), idGenerator.incrementAndGet()) +} + +val writtenStrings = generateData(total, countPerBlock) + +if (writeToBMCount != 0) { + (0 until writeToBMCount).foreach { i => +blockManager + .putIterator(blockIds(i), writtenStrings(i).iterator, StorageLevel.MEMORY_ONLY_SER) + } +} + +val segments = { + if (writeToHDFSCount != 0) { +// Generate some fake segments for the blocks in BM so the RDD does not complain +generateFakeSegments(writeToBMCount) ++ + writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount), +blockIds.slice(writeToBMCount, blockCount)) + } else { +generateFakeSegments(blockCount) + } +} + +val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf,
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454523 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import java.io.File +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.{SparkConf, SparkContext} + +import scala.collection.mutable.ArrayBuffer +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} + +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} + +class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { + val conf = new SparkConf() +.setMaster("local[2]") +.setAppName(this.getClass.getSimpleName) + val hadoopConf = new Configuration() + // Since the same BM is reused in all tests, use an atomic int to generate ids + val idGenerator = new AtomicInteger(0) + + var sparkContext: SparkContext = null + var blockManager: BlockManager = null + var file: File = null + var dir: File = null + + before { +blockManager = sparkContext.env.blockManager +dir = Files.createTempDir() +file = new File(dir, "BlockManagerWrite") + } + + after { +file.delete() +dir.delete() + } + + override def beforeAll(): Unit = { +sparkContext = new SparkContext(conf) + } + + override def afterAll(): Unit = { +// Copied from LocalSparkContext which can't be imported since spark-core test-jar does not +// get imported properly by sbt even if it is created. +sparkContext.stop() +System.clearProperty("spark.driver.port") + } + + test("Data available in BM and HDFS") { +testHDFSBackedRDD(5, 5, 20, 5) + } + + test("Data available in in BM but not in HDFS") { +testHDFSBackedRDD(5, 0, 20, 5) + } + + test("Data available in in HDFS and not in BM") { +testHDFSBackedRDD(0, 5, 20, 5) + } + + test("Data partially available in BM, and the rest in HDFS") { +testHDFSBackedRDD(3, 2, 20, 5) + } + + /** + * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the + * BlockManager, so all reads need not happen from HDFS. + * @param total Total number of Strings to write + * @param blockCount Number of blocks to write (therefore, total # of events per block = + * total/blockCount + */ + private def testHDFSBackedRDD( --- End diff -- you should explain more clearly what this test function does. it is long enough that it is no longer obvious, and your comment doesn't really address that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454515 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import java.io.File +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.{SparkConf, SparkContext} + +import scala.collection.mutable.ArrayBuffer +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} + +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} + +class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { + val conf = new SparkConf() +.setMaster("local[2]") +.setAppName(this.getClass.getSimpleName) + val hadoopConf = new Configuration() + // Since the same BM is reused in all tests, use an atomic int to generate ids + val idGenerator = new AtomicInteger(0) + + var sparkContext: SparkContext = null + var blockManager: BlockManager = null + var file: File = null + var dir: File = null + + before { +blockManager = sparkContext.env.blockManager +dir = Files.createTempDir() +file = new File(dir, "BlockManagerWrite") + } + + after { +file.delete() +dir.delete() + } + + override def beforeAll(): Unit = { +sparkContext = new SparkContext(conf) + } + + override def afterAll(): Unit = { +// Copied from LocalSparkContext which can't be imported since spark-core test-jar does not +// get imported properly by sbt even if it is created. +sparkContext.stop() +System.clearProperty("spark.driver.port") + } + + test("Data available in BM and HDFS") { +testHDFSBackedRDD(5, 5, 20, 5) + } + + test("Data available in in BM but not in HDFS") { +testHDFSBackedRDD(5, 0, 20, 5) + } + + test("Data available in in HDFS and not in BM") { +testHDFSBackedRDD(0, 5, 20, 5) + } + + test("Data partially available in BM, and the rest in HDFS") { +testHDFSBackedRDD(3, 2, 20, 5) + } + + /** + * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the --- End diff -- what do u mean by "a part of all of them"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454502 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import java.io.File +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.{SparkConf, SparkContext} + +import scala.collection.mutable.ArrayBuffer +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} + +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} + +class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { + val conf = new SparkConf() +.setMaster("local[2]") +.setAppName(this.getClass.getSimpleName) + val hadoopConf = new Configuration() + // Since the same BM is reused in all tests, use an atomic int to generate ids + val idGenerator = new AtomicInteger(0) + + var sparkContext: SparkContext = null + var blockManager: BlockManager = null + var file: File = null + var dir: File = null + + before { +blockManager = sparkContext.env.blockManager +dir = Files.createTempDir() +file = new File(dir, "BlockManagerWrite") + } + + after { +file.delete() +dir.delete() + } + + override def beforeAll(): Unit = { +sparkContext = new SparkContext(conf) + } + + override def afterAll(): Unit = { +// Copied from LocalSparkContext which can't be imported since spark-core test-jar does not +// get imported properly by sbt even if it is created. +sparkContext.stop() +System.clearProperty("spark.driver.port") + } + + test("Data available in BM and HDFS") { +testHDFSBackedRDD(5, 5, 20, 5) + } + + test("Data available in in BM but not in HDFS") { +testHDFSBackedRDD(5, 0, 20, 5) + } + + test("Data available in in HDFS and not in BM") { +testHDFSBackedRDD(0, 5, 20, 5) + } + + test("Data partially available in BM, and the rest in HDFS") { +testHDFSBackedRDD(3, 2, 20, 5) + } + + /** + * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the + * BlockManager, so all reads need not happen from HDFS. + * @param total Total number of Strings to write + * @param blockCount Number of blocks to write (therefore, total # of events per block = + * total/blockCount + */ + private def testHDFSBackedRDD( + writeToBMCount: Int, + writeToHDFSCount: Int, + total: Int, + blockCount: Int +) { --- End diff -- move this to the previous line? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454508 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import java.io.File +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.{SparkConf, SparkContext} + +import scala.collection.mutable.ArrayBuffer +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} + +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} + +class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { + val conf = new SparkConf() +.setMaster("local[2]") +.setAppName(this.getClass.getSimpleName) + val hadoopConf = new Configuration() + // Since the same BM is reused in all tests, use an atomic int to generate ids + val idGenerator = new AtomicInteger(0) + + var sparkContext: SparkContext = null + var blockManager: BlockManager = null + var file: File = null + var dir: File = null + + before { +blockManager = sparkContext.env.blockManager +dir = Files.createTempDir() +file = new File(dir, "BlockManagerWrite") + } + + after { +file.delete() +dir.delete() + } + + override def beforeAll(): Unit = { +sparkContext = new SparkContext(conf) + } + + override def afterAll(): Unit = { +// Copied from LocalSparkContext which can't be imported since spark-core test-jar does not +// get imported properly by sbt even if it is created. +sparkContext.stop() +System.clearProperty("spark.driver.port") + } + + test("Data available in BM and HDFS") { +testHDFSBackedRDD(5, 5, 20, 5) + } + + test("Data available in in BM but not in HDFS") { +testHDFSBackedRDD(5, 0, 20, 5) + } + + test("Data available in in HDFS and not in BM") { +testHDFSBackedRDD(0, 5, 20, 5) + } + + test("Data partially available in BM, and the rest in HDFS") { +testHDFSBackedRDD(3, 2, 20, 5) + } + + /** + * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the + * BlockManager, so all reads need not happen from HDFS. + * @param total Total number of Strings to write + * @param blockCount Number of blocks to write (therefore, total # of events per block = + * total/blockCount + */ + private def testHDFSBackedRDD( + writeToBMCount: Int, + writeToHDFSCount: Int, + total: Int, + blockCount: Int +) { +val countPerBlock = total / blockCount +val blockIds = (0 until blockCount).map { i => --- End diff -- Use Array.tabulate instead --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4027][Streaming] HDFSBasedBlockRDD to r...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2931#discussion_r19454510 --- Diff: streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import java.io.File +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.{SparkConf, SparkContext} + +import scala.collection.mutable.ArrayBuffer +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} + +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.storage.{BlockManager, BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} + +class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll { + val conf = new SparkConf() +.setMaster("local[2]") +.setAppName(this.getClass.getSimpleName) + val hadoopConf = new Configuration() + // Since the same BM is reused in all tests, use an atomic int to generate ids + val idGenerator = new AtomicInteger(0) + + var sparkContext: SparkContext = null + var blockManager: BlockManager = null + var file: File = null + var dir: File = null + + before { +blockManager = sparkContext.env.blockManager +dir = Files.createTempDir() +file = new File(dir, "BlockManagerWrite") + } + + after { +file.delete() +dir.delete() + } + + override def beforeAll(): Unit = { +sparkContext = new SparkContext(conf) + } + + override def afterAll(): Unit = { +// Copied from LocalSparkContext which can't be imported since spark-core test-jar does not +// get imported properly by sbt even if it is created. +sparkContext.stop() +System.clearProperty("spark.driver.port") + } + + test("Data available in BM and HDFS") { +testHDFSBackedRDD(5, 5, 20, 5) + } + + test("Data available in in BM but not in HDFS") { +testHDFSBackedRDD(5, 0, 20, 5) + } + + test("Data available in in HDFS and not in BM") { +testHDFSBackedRDD(0, 5, 20, 5) + } + + test("Data partially available in BM, and the rest in HDFS") { +testHDFSBackedRDD(3, 2, 20, 5) + } + + /** + * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the + * BlockManager, so all reads need not happen from HDFS. + * @param total Total number of Strings to write + * @param blockCount Number of blocks to write (therefore, total # of events per block = + * total/blockCount + */ + private def testHDFSBackedRDD( + writeToBMCount: Int, + writeToHDFSCount: Int, + total: Int, + blockCount: Int +) { +val countPerBlock = total / blockCount +val blockIds = (0 until blockCount).map { i => +StreamBlockId(idGenerator.incrementAndGet(), idGenerator.incrementAndGet()) --- End diff -- indent off --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4084] Reuse sort key in Sorter
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2937#discussion_r19454486 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1237,12 +1237,27 @@ private[spark] object Utils extends Logging { /** * Timing method based on iterations that permit JVM JIT optimization. * @param numIters number of iterations - * @param f function to be executed + * @param f function to be executed. If prepare is not None, the running time of each call to f + * must be an order of magnitude longer than one millisecond for accurate timing. + * @param prepare function to be executed before each call to f. Its running time doesn't count. */ - def timeIt(numIters: Int)(f: => Unit): Long = { -val start = System.currentTimeMillis -times(numIters)(f) -System.currentTimeMillis - start + def timeIt(numIters: Int)(f: => Unit, prepare: Option[() => Unit] = None): Long = { --- End diff -- done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4084] Reuse sort key in Sorter
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/2937#issuecomment-60713736 @aarondav Does `Jenkins` count? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4084] Reuse sort key in Sorter
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2937#issuecomment-60713603 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22334/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4084] Reuse sort key in Sorter
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2937#issuecomment-60713601 [Test build #22334 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22334/consoleFull) for PR 2937 at commit [`0b7b682`](https://github.com/apache/spark/commit/0b7b682ea1fad80de08dbf1d63b56d3785f686ec). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2942#discussion_r19454435 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{Vector => BV} + +import scala.reflect.ClassTag +import scala.util.Random._ + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.Logging +import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.StreamingContext._ + +/** + * :: DeveloperApi :: + * + * StreamingKMeansModel extends MLlib's KMeansModel for streaming + * algorithms, so it can keep track of the number of points assigned + * to each cluster, and also update the model by doing a single iteration + * of the standard KMeans algorithm. + * + * The update algorithm uses the "mini-batch" KMeans rule, + * generalized to incorporate forgetfullness (i.e. decay). + * The basic update rule (for each cluster) is: + * + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t] + * n_t+t = n_t + m_t + * + * Where c_t is the previously estimated centroid for that cluster, + * n_t is the number of points assigned to it thus far, x_t is the centroid + * estimated on the current batch, and m_t is the number of points assigned + * to that centroid in the current batch. + * + * This update rule is modified with a decay factor 'a' that scales + * the contribution of the clusters as estimated thus far. + * If a=1, all batches are weighted equally. If a=0, new centroids + * are determined entirely by recent data. Lower values correspond to + * more forgetting. + * + * Decay can optionally be specified as a decay fraction 'q', + * which corresponds to the fraction of batches (or points) + * after which the past will be reduced to a contribution of 0.5. + * This decay fraction can be specified in units of 'points' or 'batches'. + * if 'batches', behavior will be independent of the number of points per batch; + * if 'points', the expected number of points per batch must be specified. + * + * Use a builder pattern to construct a streaming KMeans analysis + * in an application, like: + * + * val model = new StreamingKMeans() + *.setDecayFactor(0.5) + *.setK(3) + *.setRandomCenters(5) + *.trainOn(DStream) + * + */ +@DeveloperApi +class StreamingKMeansModel( +override val clusterCenters: Array[Vector], +val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging { + + // do a sequential KMeans update on a batch of data + def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = { + +val centers = clusterCenters +val counts = clusterCounts + +// find nearest cluster to each point +val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong))) + +// get sums and counts for updating each cluster +type WeightedPoint = (BV[Double], Long) +def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = { + (p1._1 += p2._1, p1._2 + p2._2) +} +val pointStats: Array[(Int, (BV[Double], Long))] = + closest.reduceByKey{mergeContribs}.collectAsMap().toArray + +// implement update rule +for (newP <- pointStats) { + // store old count and centroid + val oldCount = counts(newP._1) + val oldCentroid = centers(newP._1).toBreeze + // get new count and centroid + val newCount = newP._2._2 + val newCentroid = newP._2._1 / newCount.toDouble + // compute the normalized scale factor that controls forgetting
[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2942#discussion_r19454416 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{Vector => BV} + +import scala.reflect.ClassTag +import scala.util.Random._ + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.Logging +import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.StreamingContext._ + +/** + * :: DeveloperApi :: + * + * StreamingKMeansModel extends MLlib's KMeansModel for streaming + * algorithms, so it can keep track of the number of points assigned + * to each cluster, and also update the model by doing a single iteration + * of the standard KMeans algorithm. + * + * The update algorithm uses the "mini-batch" KMeans rule, + * generalized to incorporate forgetfullness (i.e. decay). + * The basic update rule (for each cluster) is: + * + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t] + * n_t+t = n_t + m_t + * + * Where c_t is the previously estimated centroid for that cluster, + * n_t is the number of points assigned to it thus far, x_t is the centroid + * estimated on the current batch, and m_t is the number of points assigned + * to that centroid in the current batch. + * + * This update rule is modified with a decay factor 'a' that scales + * the contribution of the clusters as estimated thus far. + * If a=1, all batches are weighted equally. If a=0, new centroids + * are determined entirely by recent data. Lower values correspond to + * more forgetting. + * + * Decay can optionally be specified as a decay fraction 'q', + * which corresponds to the fraction of batches (or points) + * after which the past will be reduced to a contribution of 0.5. + * This decay fraction can be specified in units of 'points' or 'batches'. + * if 'batches', behavior will be independent of the number of points per batch; + * if 'points', the expected number of points per batch must be specified. + * + * Use a builder pattern to construct a streaming KMeans analysis + * in an application, like: + * + * val model = new StreamingKMeans() + *.setDecayFactor(0.5) + *.setK(3) + *.setRandomCenters(5) + *.trainOn(DStream) + * + */ +@DeveloperApi +class StreamingKMeansModel( +override val clusterCenters: Array[Vector], +val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging { + + // do a sequential KMeans update on a batch of data + def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = { + +val centers = clusterCenters +val counts = clusterCounts + +// find nearest cluster to each point +val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong))) + +// get sums and counts for updating each cluster +type WeightedPoint = (BV[Double], Long) +def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = { + (p1._1 += p2._1, p1._2 + p2._2) +} +val pointStats: Array[(Int, (BV[Double], Long))] = + closest.reduceByKey{mergeContribs}.collectAsMap().toArray + +// implement update rule +for (newP <- pointStats) { + // store old count and centroid + val oldCount = counts(newP._1) + val oldCentroid = centers(newP._1).toBreeze + // get new count and centroid + val newCount = newP._2._2 + val newCentroid = newP._2._1 / newCount.toDouble + // compute the normalized scale factor that controls forgetting
[GitHub] spark pull request: SPARK-3968 Use parquet-mr filter2 api in spark...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2841#issuecomment-60713375 [Test build #22339 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22339/consoleFull) for PR 2841 at commit [`515df1c`](https://github.com/apache/spark/commit/515df1cc70fec520828f45fb4b52a2643a6e7dcb). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3954][Streaming] promote the speed of c...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/2811#issuecomment-60713367 Maybe they are quite busy, let me ping @tdas . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4110] Wrong comments about default sett...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2972#issuecomment-60713359 [Test build #22338 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22338/consoleFull) for PR 2972 at commit [`5a171a2`](https://github.com/apache/spark/commit/5a171a2ff89ee1bcfbf8458e523c9277a2fe041d). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2953#issuecomment-60713308 Can you write a short high level design doc for this change and attach it to JIRA? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/2969#discussion_r19454387 --- Diff: core/src/main/scala/org/apache/spark/storage/TachyonStore.scala --- @@ -105,25 +106,17 @@ private[spark] class TachyonStore( return None } val is = file.getInStream(ReadType.CACHE) -var buffer: ByteBuffer = null +assert (is != null) try { - if (is != null) { -val size = file.length -val bs = new Array[Byte](size.asInstanceOf[Int]) -val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) -buffer = ByteBuffer.wrap(bs) -if (fetchSize != size) { - logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " + -s"is not equal to fetched size $fetchSize") - return None -} - } + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + ByteStreams.readFully(is, bs) + Some(ByteBuffer.wrap(bs)) } catch { --- End diff -- It's not generally safe to recover from OOM, so why would we want to catch it here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/2969#discussion_r19454354 --- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala --- @@ -106,7 +112,7 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt var is: FileInputStream = null try { is = new FileInputStream(file) - is.skip(offset) --- End diff -- http://docs.oracle.com/javase/7/docs/api/java/io/FileInputStream.html#skip(long): > Skips over and discards n bytes of data from the input stream. > > The skip method may, for a variety of reasons, end up skipping over some smaller number of bytes, possibly 0. If n is negative, an IOException is thrown, even though the skip method of the InputStream superclass does nothing in this case. The actual number of bytes skipped is returned. > > This method may skip more bytes than are remaining in the backing file. This produces no exception and the number of bytes skipped may include some number of bytes that were beyond the EOF of the backing file. Attempting to read from the stream after skipping past the end will result in -1 indicating the end of the file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3968 Use parquet-mr filter2 api in spark...
Github user saucam commented on the pull request: https://github.com/apache/spark/pull/2841#issuecomment-60713180 Added a unit test for filter pushdown on optional column --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...
Github user codedeft commented on the pull request: https://github.com/apache/spark/pull/2868#issuecomment-60713165 Here's one number. But this requires constant re-caching new node Id caches and unpersisting old node Id caches that is not reflected in the code yet. I'm not sure if frequent persisting of a new RDD from a previously persisted RDD is a cheap operation, but at least in this data set, it seems fast. Let me know if you guys know more about persistence mechanism. mnist dataset, 750 columns with 6 rows (only two partitions). 8 executors. 10-class classification. 100 trees trained, 30 max depth. Gini. With the default fraction testing. Without node-id caching, it took 24 mins 34 seconds. With node-id caching with persisting the cache every two iteration, it took 16 minutes 42 seconds. So we see noticeable benefits, as long as we frequently recache the node Id cache. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3954][Streaming] promote the speed of c...
Github user surq commented on the pull request: https://github.com/apache/spark/pull/2811#issuecomment-60713102 @jerryshao ï¼Is this a inessential patch? why no manager to merge-commit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2969#discussion_r19454325 --- Diff: core/src/main/scala/org/apache/spark/storage/TachyonStore.scala --- @@ -105,25 +106,17 @@ private[spark] class TachyonStore( return None } val is = file.getInStream(ReadType.CACHE) -var buffer: ByteBuffer = null +assert (is != null) try { - if (is != null) { -val size = file.length -val bs = new Array[Byte](size.asInstanceOf[Int]) -val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) -buffer = ByteBuffer.wrap(bs) -if (fetchSize != size) { - logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " + -s"is not equal to fetched size $fetchSize") - return None -} - } + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + ByteStreams.readFully(is, bs) + Some(ByteBuffer.wrap(bs)) } catch { --- End diff -- u probably want to catch other exception / errors, e.g. in this case i'd also try to catch out of memory --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4110] Wrong comments about default sett...
GitHub user sarutak opened a pull request: https://github.com/apache/spark/pull/2972 [SPARK-4110] Wrong comments about default settings in spark-daemon.sh In spark-daemon.sh, thare are following comments. # SPARK_CONF_DIR Alternate conf dir. Default is ${SPARK_PREFIX}/conf. # SPARK_LOG_DIR Where log files are stored. PWD by default. But, I think the default value for SPARK_CONF_DIR is `${SPARK_HOME}/conf` and for SPARK_LOG_DIR is `${SPARK_HOME}/logs`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sarutak/spark SPARK-4110 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2972.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2972 commit 5a171a2ff89ee1bcfbf8458e523c9277a2fe041d Author: Kousuke Saruta Date: 2014-10-28T05:50:42Z Fixed wrong comments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4109][CORE] Correctly deserialize Task....
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2971#issuecomment-60712916 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4109] Correctly deserialize Task.stageI...
GitHub user luluorta opened a pull request: https://github.com/apache/spark/pull/2971 [SPARK-4109] Correctly deserialize Task.stageId The two subclasses of Task, ShuffleMapTask and ResultTask, do not correctly deserialize stageId. Therefore, the accessing of TaskContext.stageId always returns zero value to the user. You can merge this pull request into a Git repository by running: $ git pull https://github.com/luluorta/spark fix-task-ser Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2971.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2971 commit ff35ee6522743989fb85ae5382f28e040af9d8a1 Author: luluorta Date: 2014-10-28T05:45:08Z correctly deserialize Task.stageId --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4107] Fix incorrect handling of read() ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2969#discussion_r19454231 --- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala --- @@ -106,7 +112,7 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt var is: FileInputStream = null try { is = new FileInputStream(file) - is.skip(offset) --- End diff -- what is the problem with the old code here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19454193 --- Diff: python/pyspark/mllib/feature.py --- @@ -95,33 +385,26 @@ class Word2Vec(object): >>> localDoc = [sentence, sentence] >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" ")) >>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc) + >>> syms = model.findSynonyms("a", 2) ->>> str(syms[0][0]) -'b' ->>> str(syms[1][0]) -'c' ->>> len(syms) -2 +>>> [s[0] for s in syms] +[u'b', u'c'] >>> vec = model.transform("a") ->>> len(vec) -10 >>> syms = model.findSynonyms(vec, 2) ->>> str(syms[0][0]) -'b' ->>> str(syms[1][0]) -'c' ->>> len(syms) -2 +>>> [s[0] for s in syms] +[u'b', u'c'] """ def __init__(self): """ Construct Word2Vec instance """ +import random # this can't be on the top because of mllib.random + self.vectorSize = 100 self.learningRate = 0.025 self.numPartitions = 1 self.numIterations = 1 -self.seed = 42L +self.seed = random.randint(0, sys.maxint) --- End diff -- `sys.maxint` -> `2 ** 32 - 1` (see #2889) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19454180 --- Diff: python/pyspark/mllib/feature.py --- @@ -18,59 +18,348 @@ """ Python package for feature in MLlib. """ +import sys +import warnings + +import py4j.protocol +from py4j.protocol import Py4JJavaError +from py4j.java_gateway import JavaObject + +from pyspark import RDD, SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer -from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd +from pyspark.mllib.linalg import Vectors, _to_java_object_rdd + +__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler', + 'HashTF', 'IDFModel', 'IDF', + 'Word2Vec', 'Word2VecModel'] + + +# Hack for support float('inf') in Py4j +old_smart_decode = py4j.protocol.smart_decode --- End diff -- Shall we use underscore for those private vars and functions: `old_smart_decode`, `float_str_mapping`, `new_smart_decode`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19454191 --- Diff: python/pyspark/mllib/feature.py --- @@ -18,59 +18,348 @@ """ Python package for feature in MLlib. """ +import sys +import warnings + +import py4j.protocol +from py4j.protocol import Py4JJavaError +from py4j.java_gateway import JavaObject + +from pyspark import RDD, SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer -from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd +from pyspark.mllib.linalg import Vectors, _to_java_object_rdd + +__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler', + 'HashTF', 'IDFModel', 'IDF', + 'Word2Vec', 'Word2VecModel'] + + +# Hack for support float('inf') in Py4j +old_smart_decode = py4j.protocol.smart_decode + +float_str_mapping = { +u'nan': u'NaN', +u'inf': u'Infinity', +u'-inf': u'-Infinity', +} + + +def new_smart_decode(obj): +if isinstance(obj, float): +s = unicode(obj) +return float_str_mapping.get(s, s) +return old_smart_decode(obj) + +py4j.protocol.smart_decode = new_smart_decode + + +# TODO: move these helper functions into utils +_picklable_classes = [ +'LinkedList', +'SparseVector', +'DenseVector', +'DenseMatrix', +'Rating', +'LabeledPoint', +] + + +def _py2java(sc, a): +""" Convert Python object into Java """ +if isinstance(a, RDD): +a = _to_java_object_rdd(a) +elif not isinstance(a, (int, long, float, bool, basestring)): +bytes = bytearray(PickleSerializer().dumps(a)) +a = sc._jvm.SerDe.loads(bytes) +return a + + +def _java2py(sc, r): +if isinstance(r, JavaObject): +clsName = r.getClass().getSimpleName() +if clsName in ("RDD", "JavaRDD"): +if clsName == "RDD": +r = r.toJavaRDD() +jrdd = sc._jvm.SerDe.javaToPython(r) +return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer())) -__all__ = ['Word2Vec', 'Word2VecModel'] +elif clsName in _picklable_classes: +r = sc._jvm.SerDe.dumps(r) +if isinstance(r, bytearray): +r = PickleSerializer().loads(str(r)) +return r -class Word2VecModel(object): + +def _callJavaFunc(sc, func, *args): +""" Call Java Function """ -class for Word2Vec model +args = [_py2java(sc, a) for a in args] +return _java2py(sc, func(*args)) + + +def _callAPI(sc, name, *args): +""" Call API in PythonMLLibAPI """ -def __init__(self, sc, java_model): +api = getattr(sc._jvm.PythonMLLibAPI(), name) +return _callJavaFunc(sc, api, *args) + + +class VectorTransformer(object): +""" +:: DeveloperApi :: +Base class for transformation of a vector or RDD of vector +""" +def transform(self, vector): """ -:param sc: Spark context -:param java_model: Handle to Java model object +Applies transformation on a vector. + +:param vector: vector to be transformed. """ +raise NotImplementedError + + +class Normalizer(VectorTransformer): +""" +:: Experimental :: +Normalizes samples individually to unit L^p^ norm + +For any 1 <= p <= float('inf'), normalizes samples using +sum(abs(vector).^p^)^(1/p)^ as norm. + +For p = float('inf'), max(abs(vector)) will be used as norm for normalization. + +>>> v = Vectors.dense(range(3)) +>>> nor = Normalizer(1) +>>> nor.transform(v) +DenseVector([0.0, 0., 0.6667]) + +>>> rdd = sc.parallelize([v]) +>>> nor.transform(rdd).collect() +[DenseVector([0.0, 0., 0.6667])] + +>>> nor2 = Normalizer(float("inf")) +>>> nor2.transform(v) +DenseVector([0.0, 0.5, 1.0]) +""" +def __init__(self, p=2): +""" +:param p: Normalization in L^p^ space, p = 2 by default. +""" +assert p >= 1.0, "p should be greater than 1.0" +self.p = float(p) + +def transform(self, vector): +""" +Applies unit length normalization on a vector. + +:param vector: vector to be normalized. +:return: normalized vector. If the norm of the input is zero, it +will return the input vector. +""" +sc = SparkContext._active_spark_context +assert s
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19454184 --- Diff: python/pyspark/mllib/feature.py --- @@ -18,59 +18,348 @@ """ Python package for feature in MLlib. """ +import sys +import warnings + +import py4j.protocol +from py4j.protocol import Py4JJavaError +from py4j.java_gateway import JavaObject + +from pyspark import RDD, SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer -from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd +from pyspark.mllib.linalg import Vectors, _to_java_object_rdd + +__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler', + 'HashTF', 'IDFModel', 'IDF', + 'Word2Vec', 'Word2VecModel'] + + +# Hack for support float('inf') in Py4j +old_smart_decode = py4j.protocol.smart_decode + +float_str_mapping = { +u'nan': u'NaN', +u'inf': u'Infinity', +u'-inf': u'-Infinity', +} + + +def new_smart_decode(obj): +if isinstance(obj, float): +s = unicode(obj) +return float_str_mapping.get(s, s) +return old_smart_decode(obj) + +py4j.protocol.smart_decode = new_smart_decode + + +# TODO: move these helper functions into utils +_picklable_classes = [ +'LinkedList', +'SparseVector', +'DenseVector', +'DenseMatrix', +'Rating', +'LabeledPoint', +] + + +def _py2java(sc, a): +""" Convert Python object into Java """ +if isinstance(a, RDD): +a = _to_java_object_rdd(a) +elif not isinstance(a, (int, long, float, bool, basestring)): +bytes = bytearray(PickleSerializer().dumps(a)) +a = sc._jvm.SerDe.loads(bytes) +return a + + +def _java2py(sc, r): +if isinstance(r, JavaObject): +clsName = r.getClass().getSimpleName() +if clsName in ("RDD", "JavaRDD"): +if clsName == "RDD": +r = r.toJavaRDD() +jrdd = sc._jvm.SerDe.javaToPython(r) +return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer())) -__all__ = ['Word2Vec', 'Word2VecModel'] +elif clsName in _picklable_classes: +r = sc._jvm.SerDe.dumps(r) +if isinstance(r, bytearray): +r = PickleSerializer().loads(str(r)) +return r -class Word2VecModel(object): + +def _callJavaFunc(sc, func, *args): +""" Call Java Function """ -class for Word2Vec model +args = [_py2java(sc, a) for a in args] +return _java2py(sc, func(*args)) + + +def _callAPI(sc, name, *args): +""" Call API in PythonMLLibAPI """ -def __init__(self, sc, java_model): +api = getattr(sc._jvm.PythonMLLibAPI(), name) +return _callJavaFunc(sc, api, *args) + + +class VectorTransformer(object): +""" +:: DeveloperApi :: +Base class for transformation of a vector or RDD of vector +""" +def transform(self, vector): """ -:param sc: Spark context -:param java_model: Handle to Java model object +Applies transformation on a vector. + +:param vector: vector to be transformed. """ +raise NotImplementedError + + +class Normalizer(VectorTransformer): +""" +:: Experimental :: +Normalizes samples individually to unit L^p^ norm + +For any 1 <= p <= float('inf'), normalizes samples using +sum(abs(vector).^p^)^(1/p)^ as norm. + +For p = float('inf'), max(abs(vector)) will be used as norm for normalization. + +>>> v = Vectors.dense(range(3)) +>>> nor = Normalizer(1) +>>> nor.transform(v) +DenseVector([0.0, 0., 0.6667]) + +>>> rdd = sc.parallelize([v]) +>>> nor.transform(rdd).collect() +[DenseVector([0.0, 0., 0.6667])] + +>>> nor2 = Normalizer(float("inf")) +>>> nor2.transform(v) +DenseVector([0.0, 0.5, 1.0]) +""" +def __init__(self, p=2): --- End diff -- `2` -> `2.0`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19454187 --- Diff: python/pyspark/mllib/feature.py --- @@ -18,59 +18,348 @@ """ Python package for feature in MLlib. """ +import sys +import warnings + +import py4j.protocol +from py4j.protocol import Py4JJavaError +from py4j.java_gateway import JavaObject + +from pyspark import RDD, SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer -from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd +from pyspark.mllib.linalg import Vectors, _to_java_object_rdd + +__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler', + 'HashTF', 'IDFModel', 'IDF', + 'Word2Vec', 'Word2VecModel'] + + +# Hack for support float('inf') in Py4j +old_smart_decode = py4j.protocol.smart_decode + +float_str_mapping = { +u'nan': u'NaN', +u'inf': u'Infinity', +u'-inf': u'-Infinity', +} + + +def new_smart_decode(obj): +if isinstance(obj, float): +s = unicode(obj) +return float_str_mapping.get(s, s) +return old_smart_decode(obj) + +py4j.protocol.smart_decode = new_smart_decode + + +# TODO: move these helper functions into utils +_picklable_classes = [ +'LinkedList', +'SparseVector', +'DenseVector', +'DenseMatrix', +'Rating', +'LabeledPoint', +] + + +def _py2java(sc, a): +""" Convert Python object into Java """ +if isinstance(a, RDD): +a = _to_java_object_rdd(a) +elif not isinstance(a, (int, long, float, bool, basestring)): +bytes = bytearray(PickleSerializer().dumps(a)) +a = sc._jvm.SerDe.loads(bytes) +return a + + +def _java2py(sc, r): +if isinstance(r, JavaObject): +clsName = r.getClass().getSimpleName() +if clsName in ("RDD", "JavaRDD"): +if clsName == "RDD": +r = r.toJavaRDD() +jrdd = sc._jvm.SerDe.javaToPython(r) +return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer())) -__all__ = ['Word2Vec', 'Word2VecModel'] +elif clsName in _picklable_classes: +r = sc._jvm.SerDe.dumps(r) +if isinstance(r, bytearray): +r = PickleSerializer().loads(str(r)) +return r -class Word2VecModel(object): + +def _callJavaFunc(sc, func, *args): +""" Call Java Function """ -class for Word2Vec model +args = [_py2java(sc, a) for a in args] +return _java2py(sc, func(*args)) + + +def _callAPI(sc, name, *args): +""" Call API in PythonMLLibAPI """ -def __init__(self, sc, java_model): +api = getattr(sc._jvm.PythonMLLibAPI(), name) +return _callJavaFunc(sc, api, *args) + + +class VectorTransformer(object): +""" +:: DeveloperApi :: +Base class for transformation of a vector or RDD of vector +""" +def transform(self, vector): """ -:param sc: Spark context -:param java_model: Handle to Java model object +Applies transformation on a vector. + +:param vector: vector to be transformed. """ +raise NotImplementedError + + +class Normalizer(VectorTransformer): +""" +:: Experimental :: +Normalizes samples individually to unit L^p^ norm + +For any 1 <= p <= float('inf'), normalizes samples using +sum(abs(vector).^p^)^(1/p)^ as norm. + +For p = float('inf'), max(abs(vector)) will be used as norm for normalization. + +>>> v = Vectors.dense(range(3)) +>>> nor = Normalizer(1) +>>> nor.transform(v) +DenseVector([0.0, 0., 0.6667]) + +>>> rdd = sc.parallelize([v]) +>>> nor.transform(rdd).collect() +[DenseVector([0.0, 0., 0.6667])] + +>>> nor2 = Normalizer(float("inf")) +>>> nor2.transform(v) +DenseVector([0.0, 0.5, 1.0]) +""" +def __init__(self, p=2): +""" +:param p: Normalization in L^p^ space, p = 2 by default. +""" +assert p >= 1.0, "p should be greater than 1.0" +self.p = float(p) + +def transform(self, vector): +""" +Applies unit length normalization on a vector. + +:param vector: vector to be normalized. +:return: normalized vector. If the norm of the input is zero, it +will return the input vector. +""" +sc = SparkContext._active_spark_context +assert s
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19454177 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala --- @@ -20,6 +20,7 @@ package org.apache.spark.mllib.feature import org.apache.spark.annotation.DeveloperApi import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD +import org.apache.spark.api.java.JavaRDD --- End diff -- organize imports in alphabetical order --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19454181 --- Diff: python/pyspark/mllib/feature.py --- @@ -18,59 +18,348 @@ """ Python package for feature in MLlib. """ +import sys +import warnings + +import py4j.protocol +from py4j.protocol import Py4JJavaError +from py4j.java_gateway import JavaObject + +from pyspark import RDD, SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer -from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd +from pyspark.mllib.linalg import Vectors, _to_java_object_rdd + +__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler', + 'HashTF', 'IDFModel', 'IDF', + 'Word2Vec', 'Word2VecModel'] + + +# Hack for support float('inf') in Py4j +old_smart_decode = py4j.protocol.smart_decode + +float_str_mapping = { +u'nan': u'NaN', +u'inf': u'Infinity', +u'-inf': u'-Infinity', +} + + +def new_smart_decode(obj): +if isinstance(obj, float): +s = unicode(obj) +return float_str_mapping.get(s, s) +return old_smart_decode(obj) + +py4j.protocol.smart_decode = new_smart_decode + + +# TODO: move these helper functions into utils +_picklable_classes = [ +'LinkedList', +'SparseVector', +'DenseVector', +'DenseMatrix', +'Rating', +'LabeledPoint', +] + + +def _py2java(sc, a): +""" Convert Python object into Java """ +if isinstance(a, RDD): +a = _to_java_object_rdd(a) +elif not isinstance(a, (int, long, float, bool, basestring)): +bytes = bytearray(PickleSerializer().dumps(a)) +a = sc._jvm.SerDe.loads(bytes) +return a + + +def _java2py(sc, r): +if isinstance(r, JavaObject): +clsName = r.getClass().getSimpleName() +if clsName in ("RDD", "JavaRDD"): +if clsName == "RDD": +r = r.toJavaRDD() +jrdd = sc._jvm.SerDe.javaToPython(r) +return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer())) -__all__ = ['Word2Vec', 'Word2VecModel'] +elif clsName in _picklable_classes: +r = sc._jvm.SerDe.dumps(r) +if isinstance(r, bytearray): +r = PickleSerializer().loads(str(r)) +return r -class Word2VecModel(object): + +def _callJavaFunc(sc, func, *args): +""" Call Java Function """ -class for Word2Vec model +args = [_py2java(sc, a) for a in args] +return _java2py(sc, func(*args)) + + +def _callAPI(sc, name, *args): +""" Call API in PythonMLLibAPI """ -def __init__(self, sc, java_model): +api = getattr(sc._jvm.PythonMLLibAPI(), name) +return _callJavaFunc(sc, api, *args) + + +class VectorTransformer(object): +""" +:: DeveloperApi :: +Base class for transformation of a vector or RDD of vector +""" +def transform(self, vector): """ -:param sc: Spark context -:param java_model: Handle to Java model object +Applies transformation on a vector. + +:param vector: vector to be transformed. """ +raise NotImplementedError + + +class Normalizer(VectorTransformer): +""" +:: Experimental :: +Normalizes samples individually to unit L^p^ norm --- End diff -- `L^p^` doesn't show up correctly in the generated doc. This is `L` with subscript `p`, so with Sphinx it should be ~~~ L\ :sub:`p`\ norm ~~~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19454172 --- Diff: docs/mllib-feature-extraction.md --- @@ -267,4 +346,25 @@ val data1 = data.map(x => (x.label, normalizer1.transform(x.features))) val data2 = data.map(x => (x.label, normalizer2.transform(x.features))) {% endhighlight %} + + +{% highlight python %} +from pyspark.mllib.util import MLUtils +from pyspark.mllib.linalg import Vectors +from pyspark.mllib.feature import Normalizer + +data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") +label = data.map(lambda x: x.label) --- End diff -- `label` -> `labels` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19454179 --- Diff: python/pyspark/mllib/feature.py --- @@ -18,59 +18,348 @@ """ Python package for feature in MLlib. """ +import sys +import warnings + +import py4j.protocol +from py4j.protocol import Py4JJavaError +from py4j.java_gateway import JavaObject + +from pyspark import RDD, SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer -from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd +from pyspark.mllib.linalg import Vectors, _to_java_object_rdd + +__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler', + 'HashTF', 'IDFModel', 'IDF', --- End diff -- `HashTF` -> `HashingTF` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19454164 --- Diff: docs/mllib-feature-extraction.md --- @@ -162,6 +204,20 @@ for((synonym, cosineSimilarity) <- synonyms) { } {% endhighlight %} + --- End diff -- Shall we skip the python examples for `Word2Vec` in this PR? #2952 added example code for it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19454166 --- Diff: docs/mllib-feature-extraction.md --- @@ -267,4 +346,25 @@ val data1 = data.map(x => (x.label, normalizer1.transform(x.features))) val data2 = data.map(x => (x.label, normalizer2.transform(x.features))) {% endhighlight %} + + +{% highlight python %} +from pyspark.mllib.util import MLUtils +from pyspark.mllib.linalg import Vectors +from pyspark.mllib.feature import Normalizer + +data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") +label = data.map(lambda x: x.label) --- End diff -- `label` is not used --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3961] [MLlib] [PySpark] Python API for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2819#discussion_r19454162 --- Diff: docs/mllib-feature-extraction.md --- @@ -95,8 +95,50 @@ tf.cache() val idf = new IDF(minDocFreq = 2).fit(tf) val tfidf: RDD[Vector] = idf.transform(tf) {% endhighlight %} + + + +TF and IDF are implemented in [HashingTF](api/python/pyspark.mllib.html#pyspark.mllib.feature.HashingTF) +and [IDF](api/python/pyspark.mllib.html#pyspark.mllib.feature.IDF). +`HashingTF` takes an RDD of list as the input. +Each record could be an iterable of strings or other types. + +{% highlight python %} +from pyspark import SparkContext +from pyspark.mllib.linalg import Vector --- End diff -- `Vector` is not used --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...
Github user codedeft commented on the pull request: https://github.com/apache/spark/pull/2868#issuecomment-60712109 Currently doing some performance testing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3343] [SQL] Add serde support for CTAS
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2570#issuecomment-60711633 [Test build #22337 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22337/consoleFull) for PR 2570 at commit [`e011ef5`](https://github.com/apache/spark/commit/e011ef557be3f438c5052e65462d5fbb89b51b6d). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4084] Reuse sort key in Sorter
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2937#issuecomment-60711412 [Test build #22336 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22336/consoleFull) for PR 2937 at commit [`c63927f`](https://github.com/apache/spark/commit/c63927f26473a3dbe664bc3621aeea12109f7afb). * This patch **fails Python style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4084] Reuse sort key in Sorter
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2937#issuecomment-60711414 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22336/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4084] Reuse sort key in Sorter
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2937#issuecomment-60711341 [Test build #22336 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22336/consoleFull) for PR 2937 at commit [`c63927f`](https://github.com/apache/spark/commit/c63927f26473a3dbe664bc3621aeea12109f7afb). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org