Re: spark.catalog.listFunctions type signatures
Hi Jacek, Thanks for the hints, I would rather have the information statically rather than build a logical plan. I'm using Apache Calcite to build SQL expressions and then I feed them to spark to run, so the pipeline goes like this: initial query in SQL (from the user) + schema definition (from db) + udf definition (spark + custom lib (Sedona, etc)) => calcite query plan ( + transformations from business logic) => SQL (with Spark Dialect) In my case, udf definitions are an input I would get from whatever is loaded in Spark. That's why it's more convenient to have the information statically here. (bonus: it would be helpful to generate documentation: https://spark.apache.org/docs/latest/api/sql/index.html) For example, this is how you would define acos type signature https://github.com/apache/calcite/blob/5c7be55ffee836366dcc7fefb6adfc0b8c47465f/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java#L1692-L1696 On Tue, Mar 28, 2023 at 3:27 PM Jacek Laskowski wrote: > Hi, > > Interesting question indeed! > > The closest I could get would be to use lookupFunctionBuilder(name: > FunctionIdentifier): Option[FunctionBuilder] [1] followed by extracting the > dataType from T in `type FunctionBuilder = Seq[Expression] => T` which can > be Expression (regular functions) or LogicalPlan (table-valued functions). > Expression has got dataType while LogicalPlan has got output > (or outputAttributes). > > HTH > > Let us know how you're doing. > > BTW, Can you describe how you "using Apache Calcite to run some SQL > transformations on Apache sparks SQL statements"? > > [1] > https://github.com/apache/spark/blob/e60ce3e85081ca8bb247aeceb2681faf6a59a056/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L91 > > Pozdrawiam, > Jacek Laskowski > > "The Internals Of" Online Books <https://books.japila.pl/> > Follow me on https://twitter.com/jaceklaskowski > > <https://twitter.com/jaceklaskowski> > > > On Tue, Mar 28, 2023 at 9:01 PM Guillaume Masse < > masse.guilla...@narrative.io> wrote: > >> Hi, >> >> I'm using Apache Calcite to run some SQL transformations on Apache sparks >> SQL statements. I would like to extract the type signature out >> of spark.catalog.listFunctions to be able to register them in Calcite with >> their proper signature. >> >> From the API, I can get the fully qualified class name and the name, but >> unfortunately, the type signature is not present. Would there be a way to >> use reflection to extract? For example: >> >> >> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala#L424 >> >> Ideally, it would be convenient to get the type signature >> from org.apache.spark.sql.catalog.Function itself when available. >> >> >> -- >> Guillaume Massé >> [Gee-OHM] >> (马赛卫) >> >
spark.catalog.listFunctions type signatures
Hi, I'm using Apache Calcite to run some SQL transformations on Apache sparks SQL statements. I would like to extract the type signature out of spark.catalog.listFunctions to be able to register them in Calcite with their proper signature. >From the API, I can get the fully qualified class name and the name, but unfortunately, the type signature is not present. Would there be a way to use reflection to extract? For example: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala#L424 Ideally, it would be convenient to get the type signature from org.apache.spark.sql.catalog.Function itself when available. -- Guillaume Massé [Gee-OHM] (马赛卫)
[Spark SQL] Structured Streaming in pyhton can connect to cassandra ?
Hello, I am a student and I am currently doing a big data project. Here is my code: https://gist.github.com/Balykoo/262d94a7073d5a7e16dfb0d0a576b9c3 My project is to retrieve messages from a twitch chat and send them into kafka then spark reads the kafka topic to perform the processing in the provided gist. I will want to send these messages into cassandra. I tested a first solution on line 72 which works but when there are too many messages spark crashes. Probably due to the fact that my function connects to cassandra each time it is called. I tried the object approach to mutualize the connection object but without success: _pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object Can you please tell me how to do this? Or at least give me some advice? Sincerely, FARCY Guillaume. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Problem mixing MESOS Cluster Mode and Docker task execution
Glad to hear it. Thanks all for sharing your solutions. Le jeu. 10 mars 2016 19:19, Eran Chinthaka Withana <eran.chinth...@gmail.com> a écrit : > Phew, it worked. All I had to do was to add *export > SPARK_JAVA_OPTS="-Dspark.mesos.executor.docker.image=echinthaka/mesos-spark:0.23.1-1.6.0-2.6" > *before calling spark-submit. Guillaume, thanks for the pointer. > > Timothy, thanks for looking into this. Looking forward to see a fix soon. > > Thanks, > Eran Chinthaka Withana > > On Thu, Mar 10, 2016 at 10:10 AM, Tim Chen <t...@mesosphere.io> wrote: > >> Hi Eran, >> >> I need to investigate but perhaps that's true, we're using >> SPARK_JAVA_OPTS to pass all the options and not --conf. >> >> I'll take a look at the bug, but if you can try the workaround and see if >> that fixes your problem. >> >> Tim >> >> On Thu, Mar 10, 2016 at 10:08 AM, Eran Chinthaka Withana < >> eran.chinth...@gmail.com> wrote: >> >>> Hi Timothy >>> >>> What version of spark are you guys running? >>>> >>> >>> I'm using Spark 1.6.0. You can see the Dockerfile I used here: >>> https://github.com/echinthaka/spark-mesos-docker/blob/master/docker/mesos-spark/Dockerfile >>> >>> >>> >>>> And also did you set the working dir in your image to be spark home? >>>> >>> >>> Yes I did. You can see it here: https://goo.gl/8PxtV8 >>> >>> Can it be because of this: >>> https://issues.apache.org/jira/browse/SPARK-13258 as Guillaume pointed >>> out above? As you can see, I'm passing in the docker image URI through >>> spark-submit (--conf spark.mesos.executor.docker. >>> image=echinthaka/mesos-spark:0.23.1-1.6.0-2.6) >>> >>> Thanks, >>> Eran >>> >>> >>> >> >
Re: Problem mixing MESOS Cluster Mode and Docker task execution
For an answer to my question see this: http://stackoverflow.com/a/35660466?noredirect=1. But for your problem did you define the Spark.mesos.docker. home or something like that property? Le jeu. 10 mars 2016 04:26, Eran Chinthaka Withanaa écrit : > Hi > > I'm also having this issue and can not get the tasks to work inside mesos. > > In my case, the spark-submit command is the following. > > $SPARK_HOME/bin/spark-submit \ > --class com.mycompany.SparkStarter \ > --master mesos://mesos-dispatcher:7077 \ --name SparkStarterJob \ > --driver-memory 1G \ > --executor-memory 4G \ > --deploy-mode cluster \ > --total-executor-cores 1 \ > --conf > spark.mesos.executor.docker.image=echinthaka/mesos-spark:0.23.1-1.6.0-2.6 \ > http://abc.com/spark-starter.jar > > > And the error I'm getting is the following. > > I0310 03:13:11.417009 131594 exec.cpp:132] Version: 0.23.1 > I0310 03:13:11.419452 131601 exec.cpp:206] Executor registered on slave > 20160223-000314-3439362570-5050-631-S0 > sh: 1: /usr/spark-1.6.0-bin-hadoop2.6/bin/spark-class: not found > > > (Looked into Spark JIRA and I found that > https://issues.apache.org/jira/browse/SPARK-11759 is marked as closed > since https://issues.apache.org/jira/browse/SPARK-12345 is marked as > resolved) > > Really appreciate if I can get some help here. > > Thanks, > Eran Chinthaka Withana > > On Wed, Feb 17, 2016 at 2:00 PM, g.eynard.bonte...@gmail.com < > g.eynard.bonte...@gmail.com> wrote: > >> Hi everybody, >> >> I am testing the use of Docker for executing Spark algorithms on MESOS. I >> managed to execute Spark in client mode with executors inside Docker, but >> I >> wanted to go further and have also my Driver running into a Docker >> Container. Here I ran into a behavior that I'm not sure is normal, let me >> try to explain. >> >> I submit my spark application through MesosClusterDispatcher using a >> command >> like: >> $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master >> mesos://spark-master-1:7077 --deploy-mode cluster --conf >> spark.mesos.executor.docker.image=myuser/myimage:0.0.2 >> >> https://storage.googleapis.com/some-bucket/spark-examples-1.5.2-hadoop2.6.0.jar >> 10 >> >> My driver is running fine, inside its docker container, but the executors >> fail: >> "sh: /some/spark/home/bin/spark-class: No such file or directory" >> >> Looking on MESOS slaves log, I think that the executors do not run inside >> docker: "docker.cpp:775] No container info found, skipping launch". As my >> Mesos slaves do not have spark installed, it fails. >> >> *It seems that the spark conf that I gave in the first spark-submit is not >> transmitted to the Driver submitted conf*, when launched in the docker >> container. The only workaround I found is to modify my Docker image in >> order >> to define inside its spark conf the spark.mesos.executor.docker.image >> property. This way, my executors get the conf well and are launched inside >> docker on Mesos. This seems a little complicated to me, and I feel the >> configuration passed to the early spark-submit should be transmitted to >> the >> Driver submit... >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-mixing-MESOS-Cluster-Mode-and-Docker-task-execution-tp26258.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: Is Spark right for us?
Hi everyone, First thanks for taking some time on your Sunday to reply. Some points in no particular order: . The feedback from everyone tells me that I have a lot of reading to do first. Thanks for all the pointers. . The data is currently stored in a row-oriented database (SQL Server 2012 to be precise), but as I said we're open to moving data to a different kind of data store (column-oriented, document, etc.) . I don't have precise numbers for the size of the database, but I would guess the larger ones have around 100 GB of data. To us, this is huge; obviously, for companies such as Google, it's a second's worth of data. . For this particular issue, we're talking about ordinal data, not free text fields. . I agree that Spark is tooling, but I also see it as an implementation of a specific design, namely distributed computing on a distributed data store, if I understand correctly. . For sure, I would like to avoid introducing a new technology to the mix, so reusing the current infrastructure in a more optimal way would be our first choice. . Our main issue is that we'd like to be able to scale by distributing instead of adding more memory to this single database. The current computations are done using SQL queries. The data set does not fit in memory. So yes, we could distribute query construction and result aggregation, but the database would still be the bottleneck. That's why I'm wondering if we should investigate technologies such as Spark or Hadoop, but maybe I'm completely mistaken and we can leverage our current infrastructure. Thanks, GB On Mon, Mar 7, 2016 at 3:05 AM, Jörn Franke <jornfra...@gmail.com> wrote: > I think the Relational Database will be faster for ordinal data (eg where > you answer from 1..x). For free text fields I would recommend solr or > elastic search, because they have a lot more text analytics capabilities > that do not exist in a relational database or MongoDB and are not likely to > be there in the near future. > > On 06 Mar 2016, at 18:25, Guillaume Bilodeau <guillaume.bilod...@gmail.com> > wrote: > > The data is currently stored in a relational database, but a migration to > a document-oriented database such as MongoDb is something we are definitely > considering. How does this factor in? > > On Sun, Mar 6, 2016 at 12:23 PM, Gourav Sengupta < > gourav.sengu...@gmail.com> wrote: > >> Hi, >> >> That depends on a lot of things, but as a starting point I would ask >> whether you are planning to store your data in JSON format? >> >> >> Regards, >> Gourav Sengupta >> >> On Sun, Mar 6, 2016 at 5:17 PM, Laumegui Deaulobi < >> guillaume.bilod...@gmail.com> wrote: >> >>> Our problem space is survey analytics. Each survey comprises a set of >>> questions, with each question having a set of possible answers. Survey >>> fill-out tasks are sent to users, who have until a certain date to >>> complete >>> it. Based on these survey fill-outs, reports need to be generated. Each >>> report deals with a subset of the survey fill-outs, and comprises a set >>> of >>> data points (average rating for question 1, min/max for question 2, etc.) >>> >>> We are dealing with rather large data sets - although reading the >>> internet >>> we get the impression that everyone is analyzing petabytes of data... >>> >>> Users: up to 100,000 >>> Surveys: up to 100,000 >>> Questions per survey: up to 100 >>> Possible answers per question: up to 10 >>> Survey fill-outs / user: up to 10 >>> Reports: up to 100,000 >>> Data points per report: up to 100 >>> >>> Data is currently stored in a relational database but a migration to a >>> different kind of store is possible. >>> >>> The naive algorithm for report generation can be summed up as this: >>> >>> for each report to be generated { >>> for each report data point to be calculated { >>> calculate data point >>> add data point to report >>> } >>> publish report >>> } >>> >>> In order to deal with the upper limits of these values, we will need to >>> distribute this algorithm to a compute / data cluster as much as >>> possible. >>> >>> I've read about frameworks such as Apache Spark but also Hadoop, >>> GridGain, >>> HazelCast and several others, and am still confused as to how each of >>> these >>> can help us and how they fit together. >>> >>> Is Spark the right framework for us? >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-right-for-us-tp26412.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com >>> <http://nabble.com>. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >
mllib.recommendations.als recommendForAll not ported to ml?
I have experimented very low performance with the ALSModel.transform method when feeding it with even a small cartesian product of user x items. The former mllib implementation has a recommendForAll method to return topn items per users in an efficient way (using the blockify method to distribute parts of users and items factors). https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala#L271 I could revert to mlib, but the ALS benefits nice optimization in ml (https://issues.apache.org/jira/browse/SPARK-3541). Do you guys consider to port the recommendForAll to ml? Thanks in advance! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-recommendations-als-recommendForAll-not-ported-to-ml-tp25609.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: pyspark: Error when training a GMM with an initial GaussianMixtureModel
Hi all, We're trying to train a Gaussian Mixture Model (GMM) with a specified initial model. Doc 1.5.1 says we should use a GaussianMixtureModel object as input for the "initialModel" parameter to the GaussianMixture.train method. Before creating our own initial model (the plan is to use a Kmean result for instance), we simply wanted to test case this scenario. So we try to initialize a 2nd training using the GaussianMixtureModel from the output a 1st training. But this trivial scenario throws an error. Could you please help us determine what's going on here ? Thanks a lot guillaume PS: we run (py)spark 1.5.1 with hadoop 2.6 Below is the trivial scenario code and the error: SOURCE CODE from pyspark.mllib.clustering import GaussianMixture from numpy import array import sys import os import pyspark ### Local default options K=2 # "k" (int) Set the number of Gaussians in the mixture model. Default: 2 convergenceTol=1e-3 # "convergenceTol" (double) Set the largest change in log-likelihood at which convergence is considered to have occurred. maxIterations=100 # "maxIterations" (int) Set the maximum number of iterations to run. Default: 100 seed=None # "seed" (long) Set the random seed initialModel=None ### Load and parse the sample data data = sc.textFile("gmm_data.txt") # Data from the dummy set here: data/mllib/gmm_data.txt parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')])) print type(parsedData) print type(parsedData.first()) ### 1st training: Build the GMM gmm = GaussianMixture.train(parsedData, K, convergenceTol, maxIterations, seed, initialModel) # output parameters of model for i in range(2): print ("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu, "sigma = ", gmm.gaussians[i].sigma.toArray()) ### 2nd training: Re-build a GMM using an initial model initialModel = gmm print type(initialModel) gmm = GaussianMixture.train(parsedData, K, convergenceTol, maxIterations, seed, initialModel) OUTPUT WITH ERROR: ('weight = ', 0.51945003367044018, 'mu = ', DenseVector([-0.1045, 0.0429]), 'sigma = ', array([[ 4.90706817, -2.00676881], [-2.00676881, 1.01143891]])) ('weight = ', 0.48054996632955982, 'mu = ', DenseVector([0.0722, 0.0167]), 'sigma = ', array([[ 4.77975653, 1.87624558], [ 1.87624558, 0.91467242]])) --- Py4JJavaError Traceback (most recent call last) in () 33 initialModel = gmm 34 print type(initialModel) ---> 35 gmm = GaussianMixture.train(parsedData, K, convergenceTol, maxIterations, seed, initialModel) # /opt/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k, convergenceTol, maxIterations, seed, initialModel) 306 java_model = callMLlibFunc("trainGaussianMixtureModel", rdd.map(_convert_to_vector), 307k, convergenceTol, maxIterations, seed, --> 308initialModelWeights, initialModelMu, initialModelSigma) 309 return GaussianMixtureModel(java_model) 310 /opt/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/mllib/common.pyc in callMLlibFunc(name, *args) 128 sc = SparkContext._active_spark_context 129 api = getattr(sc._jvm.PythonMLLibAPI(), name) --> 130 return callJavaFunc(sc, api, *args) 131 132 /opt/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, *args) 120 def callJavaFunc(sc, func, *args): 121 """ Call Java Function """ --> 122 args = [_py2java(sc, a) for a in args] 123 return _java2py(sc, func(*args)) 124 /opt/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/mllib/common.pyc in _py2java(sc, obj) 86 else: 87 data = bytearray(PickleSerializer().dumps(obj)) ---> 88 obj = sc._jvm.SerDe.loads(data) 89 return obj 90 /opt/spark/spark-1.5.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, --> 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /opt/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/utils.pyc in deco(*a, **kw) 34 def deco(*a, **kw): 35 try: ---> 36 return f(*a, **kw) 37 except py4j.protocol.Py4JJavaError as e: 38 s = e.java_exception.toString() /opt/spark/spark-1.5.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_cl
Model Save function (ML-Lib)
Hi: I'm using pyspark 1.3 and it seems that the model.save is not implemented for everyone. Here is what I have so far: *Model Name* *Model Class* *save available* Logistic Regression LogisticRegressionModel NO Random Forest TreeEnsembleModel OK GBM GradientBoostedTreesModel OK SVM SVMModel NO What is the recommended route to save a logistic regression or SVM ? I tried to pickle the SVM but it failed at loading it back. Any advice appreciated. Thanks ! Best, Guillaume Guy * +1 919 - 972 - 8750*
Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?
Hi, So I've done this Node-centered accumulator, I've written a small piece about it : http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/ Hope it can help someone Guillaume 2015-06-18 15:17 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com: I was thinking exactly the same. I'm going to try it, It doesn't really matter if I lose an executor, since its sketch will be lost, but then reexecuted somewhere else. I mean that between the action that will update the sketches and the action to collect/merge them you can loose an executor. So outside of sparks control. But it's probably an acceptable risk. And anyway, it's an approximate data structure, and what matters are ratios, not exact values. I mostly need to take care of the concurrency problem for my sketch. I think you could do something like: - Have this singleton that holds N sketch instances (one for each executor core) - Inside an operation over partitions (like forEachPartition/mapPartitions) - at the begin you ask the singleton to provide you with one instance to use, in a sync. fashion and pick it out from the N available instances or mark it as in use - when the iterator over the partition doesn't have more elements then you release this sketch - Then you can do something like sc.parallelize(Seq(...)).coalesce(numExecutors).map(pickTheSketches).reduce(blabla), but you will have to make sure that this will be executed over each executor (not sure if a dataset than executor num, will trigger an action on every executor) Please let me know what you end up doing, sounds interesting :) Eugen Guillaume Yeah thats the problem. There is probably some perfect num of partitions that provides the best balance between partition size and memory and merge overhead. Though it's not an ideal solution :( There could be another way but very hacky... for example if you store one sketch in a singleton per jvm (so per executor). Do a first pass over your data and update those. Then you trigger some other dummy operation that will just retrieve the sketches. Thats kind of a hack but should work. Note that if you loose an executor in between, then that doesn't work anymore, probably you could detect it and recompute the sketches, but it would become over complicated. 2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com: Hi, Thank you for this confirmation. Coalescing is what we do now. It creates, however, very big partitions. Guillaume Hey, I am not 100% sure but from my understanding accumulators are per partition (so per task as its the same) and are sent back to the driver with the task result and merged. When a task needs to be run n times (multiple rdds depend on this one, some partition loss later in the chain etc) then the accumulator will count n times the values from that task. So in short I don't think you'd win from using an accumulator over what you are doing right now. You could maybe coalesce your rdd to num-executors without a shuffle and then update the sketches. You should endup with 1 partition per executor thus 1 sketch per executor. You could then increase the number of threads per task if you can use the sketches concurrently. Eugen 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com: Hi, I'm trying to figure out the smartest way to implement a global count-min-sketch on accumulators. For now, we are doing that with RDDs. It works well, but with one sketch per partition, merging takes too long. As you probably know, a count-min sketch is a big mutable array of array of ints. To distribute it, all sketches must have the same size. Since it can be big, and since merging is not free, I would like to minimize the number of sketches and maximize reuse and conccurent use of the sketches. Ideally, I would like to just have one sketch per worker. I think accumulables might be the right structures for that, but it seems that they are not shared between executors, or even between tasks. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala (289) /** * This thread-local map holds per-task copies of accumulators; it is used to collect the set * of accumulator updates to send back to the driver when tasks complete. After tasks complete
Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?
Hi, Thank you for this confirmation. Coalescing is what we do now. It creates, however, very big partitions. Guillaume Hey, I am not 100% sure but from my understanding accumulators are per partition (so per task as its the same) and are sent back to the driver with the task result and merged. When a task needs to be run n times (multiple rdds depend on this one, some partition loss later in the chain etc) then the accumulator will count n times the values from that task. So in short I don't think you'd win from using an accumulator over what you are doing right now. You could maybe coalesce your rdd to num-executors without a shuffle and then update the sketches. You should endup with 1 partition per executor thus 1 sketch per executor. You could then increase the number of threads per task if you can use the sketches concurrently. Eugen 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com: Hi, I'm trying to figure out the smartest way to implement a global count-min-sketch on accumulators. For now, we are doing that with RDDs. It works well, but with one sketch per partition, merging takes too long. As you probably know, a count-min sketch is a big mutable array of array of ints. To distribute it, all sketches must have the same size. Since it can be big, and since merging is not free, I would like to minimize the number of sketches and maximize reuse and conccurent use of the sketches. Ideally, I would like to just have one sketch per worker. I think accumulables might be the right structures for that, but it seems that they are not shared between executors, or even between tasks. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala (289) /** * This thread-local map holds per-task copies of accumulators; it is used to collect the set * of accumulator updates to send back to the driver when tasks complete. After tasks complete, * this map is cleared by `Accumulators.clear()` (see Executor.scala). */ private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { override protected def initialValue() = Map[Long, Accumulable[_, _]]() } The localAccums stores an accumulator for each task (it's thread local, so I assume each task have a unique thread on executors) If I understand correctly, each time a task starts, an accumulator is initialized locally, updated, then sent back to the driver for merging ? So I guess, accumulators may not be the way to go, finally. Any advice ? Guillaume -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?
I was thinking exactly the same. I'm going to try it, It doesn't really matter if I lose an executor, since its sketch will be lost, but then reexecuted somewhere else. And anyway, it's an approximate data structure, and what matters are ratios, not exact values. I mostly need to take care of the concurrency problem for my sketch. Guillaume Yeah thats the problem. There is probably some perfect num of partitions that provides the best balance between partition size and memory and merge overhead. Though it's not an ideal solution :( There could be another way but very hacky... for example if you store one sketch in a singleton per jvm (so per executor). Do a first pass over your data and update those. Then you trigger some other dummy operation that will just retrieve the sketches. Thats kind of a hack but should work. Note that if you loose an executor in between, then that doesn't work anymore, probably you could detect it and recompute the sketches, but it would become over complicated. 2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com: Hi, Thank you for this confirmation. Coalescing is what we do now. It creates, however, very big partitions. Guillaume Hey, I am not 100% sure but from my understanding accumulators are per partition (so per task as its the same) and are sent back to the driver with the task result and merged. When a task needs to be run n times (multiple rdds depend on this one, some partition loss later in the chain etc) then the accumulator will count n times the values from that task. So in short I don't think you'd win from using an accumulator over what you are doing right now. You could maybe coalesce your rdd to num-executors without a shuffle and then update the sketches. You should endup with 1 partition per executor thus 1 sketch per executor. You could then increase the number of threads per task if you can use the sketches concurrently. Eugen 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com: Hi, I'm trying to figure out the smartest way to implement a global count-min-sketch on accumulators. For now, we are doing that with RDDs. It works well, but with one sketch per partition, merging takes too long. As you probably know, a count-min sketch is a big mutable array of array of ints. To distribute it, all sketches must have the same size. Since it can be big, and since merging is not free, I would like to minimize the number of sketches and maximize reuse and conccurent use of the sketches. Ideally, I would like to just have one sketch per worker. I think accumulables might be the right structures for that, but it seems that they are not shared between executors, or even between tasks. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala (289) /** * This thread-local map holds per-task copies of accumulators; it is used to collect the set * of accumulator updates to send back to the driver when tasks complete. After tasks complete, * this map is cleared by `Accumulators.clear()` (see Executor.scala). */ private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { override protected def initialValue() = Map[Long, Accumulable[_, _]]() } The localAccums stores an accumulator for each task (it's thread local, so I assume each task have a unique thread on executors) If I understand correctly, each time a task starts, an accumulator is initialized locally, updated, then sent back to the driver for merging ? So I guess, accumulators may not be the way to go, finally. Any advice ? Guillaume -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Best way to randomly distribute elements
I think you can randomly reshuffle your elements just by emitting a random key (mapping a PairRdd's key triggers a reshuffle IIRC) yourrdd.map{ x = (rand(), x)} There is obiously a risk that rand() will give same sequence of numbers in each partition, so you may need to use mapPartitionsWithIndex first and seed your rand with the partition id (or compute your rand from a seed based on x). Guillaume Hello, In the context of a machine learning algorithm, I need to be able to randomly distribute the elements of a large RDD across partitions (i.e., essentially assign each element to a random partition). How could I achieve this? I have tried to call repartition() with the current number of partitions - but it seems to me that this moves only some of the elements, and in a deterministic way. I know this will be an expensive operation but I only need to perform it every once in a while. Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-randomly-distribute-elements-tp23391.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Accumulators / Accumulables : thread-local, task-local, executor-local ?
Hi, I'm trying to figure out the smartest way to implement a global count-min-sketch on accumulators. For now, we are doing that with RDDs. It works well, but with one sketch per partition, merging takes too long. As you probably know, a count-min sketch is a big mutable array of array of ints. To distribute it, all sketches must have the same size. Since it can be big, and since merging is not free, I would like to minimize the number of sketches and maximize reuse and conccurent use of the sketches. Ideally, I would like to just have one sketch per worker. I think accumulables might be the right structures for that, but it seems that they are not shared between executors, or even between tasks. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala (289) /** * This thread-local map holds per-task copies of accumulators; it is used to collect the set * of accumulator updates to send back to the driver when tasks complete. After tasks complete, * this map is cleared by `Accumulators.clear()` (see Executor.scala). */ private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { override protected def initialValue() = Map[Long, Accumulable[_, _]]() } The localAccums stores an accumulator for each task (it's thread local, so I assume each task have a unique thread on executors) If I understand correctly, each time a task starts, an accumulator is initialized locally, updated, then sent back to the driver for merging ? So I guess, accumulators may not be the way to go, finally. Any advice ? Guillaume -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Random pairs / RDD order
Hi Aurelien, Sean's solution is nice, but maybe not completely order-free, since pairs will come from the same partition. The easiest / fastest way to do it in my opinion is to use a random key instead of a zipWithIndex. Of course you'll not be able to ensure uniqueness of each elements of the pairs, but maybe you don't care since you're sampling with replacement already? val a = rdd.sample(...).map{ x = (rand() % k, x)} val b = rdd.sample(...).map{ x = (rand() % k, x)} k must be ~ the number of elements you're sampling. You'll have a skewed distribution due to collisions, but I don't think it should hurt too much. Guillaume Hi everyone, However I am not happy with this solution because each element is most likely to be paired with elements that are closeby in the partition. This is because sample returns an ordered Iterator. -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
Right, I remember now, the only problematic case is when things go bad and the cleaner is not executed. Also, it can be a problem when reusing the same sparkcontext for many runs. Guillaume It cleans the work dir, and SPARK_LOCAL_DIRS should be cleaned automatically. From the source code comments: // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the // application finishes. On 13.04.2015, at 11:26, Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com wrote: Does it also cleanup spark local dirs ? I thought it was only cleaning $SPARK_HOME/work/ Guillaume I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example: export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=seconds On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com mailto:ningjun.w...@lexisnexis.com wrote: Does anybody have an answer for this? Thanks Ningjun *From:*Wang, Ningjun (LNG-NPV) *Sent:*Thursday, April 02, 2015 12:14 PM *To:*user@spark.apache.org mailto:user@spark.apache.org *Subject:*Is the disk space in SPARK_LOCAL_DIRS cleanned up? I set SPARK_LOCAL_DIRS to C:\temp\spark-temp. When RDDs are shuffled, spark writes to this folder. I found that the disk space of this folder keep on increase quickly and at certain point I will run out of disk space. I wonder does spark clean up the disk spacein this folder once the shuffle operation is done? If not, I need to write a job to clean it up myself. But how do I know which sub folders there can be removed? Ningjun -- exensa_logo_mail.png *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Spark Cluster: RECEIVED SIGNAL 15: SIGTERM
That's why I think it's the OOM killer. There are several cases of memory overuse / errors : 1 - The application tries to allocate more than the Heap limit and GC cannot free more memory = OutOfMemory : Java Heap Space exception from JVM 2 - The jvm is configured with a max heap size larger than the available memory. At some point the application needs to allocate memory in JVM, the JVM tries to extend its heap and allocate real memory (or maybe the OS is configured with overcommit virtual memory), but fails = Kill process of sacrifice child (or others, depending on various factors : https://plumbr.eu/outofmemoryerror) 3 - The jvm has allocated its memory from the beginning and it has been served, but other processes start starving from memory shortage, the pressure on memory grows beyond the threshold configured in the OOM Killer, and boom, the java process is selected for a sacrifice because it is the main culprit of memory consumption. Guillaume Linux OOM throws SIGTERM, but if I remember correctly JVM handles heap memory limits differently and throws OutOfMemoryError and eventually sends SIGINT. Not sure what happened but the worker simply received a SIGTERM signal, so perhaps the daemon was terminated by someone or a parent process. Just my guess. Tim On Mon, Apr 13, 2015 at 2:28 AM, Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com wrote: Very likely to be this : http://www.linuxdevcenter.com/pub/a/linux/2006/11/30/linux-out-of-memory.html?page=2 Your worker ran out of memory = maybe you're asking for too much memory for the JVM, or something else is running on the worker Guillaume Any idea what this means, many thanks == logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1 == 15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910 with 4 cores, 6.6 GB RAM 15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0 15/04/13 07:07:22 INFO Worker: Spark home: /remote/users//work/tools/spark-1.3.0-bin-hadoop2.4 15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT 15/04/13 07:07:22 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081 http://SelectChannelConnector@0.0.0.0:8081 15/04/13 07:07:22 INFO Utils: Successfully started service 'WorkerUI' on port 8081. 15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at http://09:8081 15/04/13 07:07:22 INFO Worker: Connecting to master akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master... 15/04/13 07:07:22 INFO Worker: Successfully registered with master spark://08:7077 *15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM* -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Spark Cluster: RECEIVED SIGNAL 15: SIGTERM
Very likely to be this : http://www.linuxdevcenter.com/pub/a/linux/2006/11/30/linux-out-of-memory.html?page=2 Your worker ran out of memory = maybe you're asking for too much memory for the JVM, or something else is running on the worker Guillaume Any idea what this means, many thanks == logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1 == 15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910 with 4 cores, 6.6 GB RAM 15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0 15/04/13 07:07:22 INFO Worker: Spark home: /remote/users//work/tools/spark-1.3.0-bin-hadoop2.4 15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT 15/04/13 07:07:22 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081 http://SelectChannelConnector@0.0.0.0:8081 15/04/13 07:07:22 INFO Utils: Successfully started service 'WorkerUI' on port 8081. 15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at http://09:8081 15/04/13 07:07:22 INFO Worker: Connecting to master akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master... 15/04/13 07:07:22 INFO Worker: Successfully registered with master spark://08:7077 *15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM* -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
Does it also cleanup spark local dirs ? I thought it was only cleaning $SPARK_HOME/work/ Guillaume I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example: export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=seconds On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com mailto:ningjun.w...@lexisnexis.com wrote: Does anybody have an answer for this? Thanks Ningjun *From:*Wang, Ningjun (LNG-NPV) *Sent:*Thursday, April 02, 2015 12:14 PM *To:*user@spark.apache.org mailto:user@spark.apache.org *Subject:*Is the disk space in SPARK_LOCAL_DIRS cleanned up? I set SPARK_LOCAL_DIRS to C:\temp\spark-temp. When RDDs are shuffled, spark writes to this folder. I found that the disk space of this folder keep on increase quickly and at certain point I will run out of disk space. I wonder does spark clean up the disk spacein this folder once the shuffle operation is done? If not, I need to write a job to clean it up myself. But how do I know which sub folders there can be removed? Ningjun -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
Hi, I had to setup a cron job for cleanup in $SPARK_HOME/work and in $SPARK_LOCAL_DIRS. Here are the cron lines. Unfortunately it's for *nix machines, I guess you will have to adapt it seriously for Windows. 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ 32 * * * * find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ They remove directories older than a day. The cron have to be setup both on the executors AND on the driver (the spark local dir of the driver can be heavily used if using a lot of broadcast) I think in recent versions of Spark, the $SPARK_HOME/work is correctly cleaned up, but adding a cron won't hurt. Guillaume Does anybody have an answer for this? Thanks Ningjun *From:*Wang, Ningjun (LNG-NPV) *Sent:* Thursday, April 02, 2015 12:14 PM *To:* user@spark.apache.org *Subject:* Is the disk space in SPARK_LOCAL_DIRS cleanned up? I set SPARK_LOCAL_DIRS to C:\temp\spark-temp. When RDDs are shuffled, spark writes to this folder. I found that the disk space of this folder keep on increase quickly and at certain point I will run out of disk space. I wonder does spark clean up the disk spacein this folder once the shuffle operation is done? If not, I need to write a job to clean it up myself. But how do I know which sub folders there can be removed? Ningjun -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Join on Spark too slow.
Maybe I'm wrong, but what you are doing here is basically a bunch of cartesian product for each key. So if hello appear 100 times in your corpus, it will produce 100*100 elements in the join output. I don't understand what you're doing here, but it's normal your join takes forever, it makes no sense as it, IMO. Guillaume Hello guys, I am trying to run the following dummy example for Spark, on a dataset of 250MB, using 5 machines with 10GB RAM each, but the join seems to be taking too long ( 2hrs). I am using Spark 0.8.0 but I have also tried the same example on more recent versions, with the same results. Do you have any idea why this is happening? Thanks a lot, Kostas ** *val *sc = *new *SparkContext( args(0), *DummyJoin*, System./getenv/(*SPARK_HOME*), /Seq/(System./getenv/(*SPARK_EXAMPLES_JAR*))) *val *file = sc.textFile(args(1)) *val *wordTuples = file .flatMap(line = line.split(args(2))) .map(word = (word, 1)) *val *big = wordTuples.filter { *case *((k, v)) = k != *a *}.cache() *val *small = wordTuples.filter { *case *((k, v)) = k != *a * k != *to * k != *and *}.cache() *val *res = big.leftOuterJoin(small) res.saveAsTextFile(args(3)) } -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Pairwise computations within partition
I would try something like that : val a = rdd.sample(false,0.1,1).zipwithindex.map{ case (vector,index) = (index,vector)} val b = rdd.sample(false,0.1,2).zipwithindex.map{ case (vector,index) = (index,vector)} a.join(b).map { case (_,(vectora,vectorb)) = yourOperation } Grouping by blocks is probably not what you want, since it would restrict the scope of a vector to the vectors in the same block. Guillaume Hello everyone, I am a Spark novice facing a nontrivial problem to solve with Spark. I have an RDD consisting of many elements (say, 60K), where each element is is a d-dimensional vector. I want to implement an iterative algorithm which does the following. At each iteration, I want to apply an operation on *pairs* of elements (say, compute their dot product). Of course the number of pairs is huge, but I only need to consider a small random subset of the possible pairs at each iteration. To minimize communication between nodes, I am willing to partition my RDD by key (where each elements gets a random key) and to only consider pairs of elements that belong to the same partition (i.e., that share the same key). But I am not sure how to sample and apply the operation on pairs, and to make sure that the computation for each pair is indeed done by the node holding the corresponding elements. Any help would be greatly appreciated. Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pairwise-computations-within-partition-tp22436.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Incremently load big RDD file into Memory
Hi Muhammad, There are lots of ways to do it. My company actually develops a text mining solution which embeds a very fast Approximate Neighbours solution (a demo with real time queries on the wikipedia dataset can be seen at wikinsights.org). For the record, we now prepare a dataset of 4.5 million documents for querying in about 2 or 3 minutes on a 32 cores cluster, and the queries take less than 10ms when the dataset is in memory. But if you just want to precompute everything and don't mind waiting a few tens of minutes (or hours), and don't want to bother with an approximate neighbour solution, then the best way is probably something like this : 1 - block your data (i.e. group your items in X large groups). Instead of a dataset of N elements, you should now have a dataset of X blocks containing N/X elements each. 2 - do the cartesian product (instead of N*N elements, you now have just X*X blocks, which should take less memory) 3 - for each pair of blocks (blockA,blockB), perform the computation of distances for each elements of blockA with each element of blockB, but keep only the top K best for each element of blockA. Output is List((elementOfBlockA, listOfKNearestElementsOfBlockBWithTheDistance),..) 4 - reduceByKey (the key is the elementOfBlockA), by merging the listOfNearestElements and always keeping the K nearest. This is an exact version of top K. This is only interesting if K N/X. But even if K is large, it is possible that it will fit your needs. Remember that you will still compute N*N distances (this is the problem with exact nearest neighbours), the only difference with what you're doing now is that you produces less items and duplicates less data. Indeed, if one of your elements takes 100bytes, the per element cartesian will produce N*N*100*2 bytes, while the blocked version will produce X*X*100*2*N/X, ie X*N*100*2 bytes. Guillaume Hi Guillaume, Thanks for you reply. Can you please tell me how can i improve for Top-k nearest points. P.S. My post is not accepted on the list thats why i am sending you email here. I would be really grateful to you if you reply it. Thanks, On Wed, Apr 8, 2015 at 1:23 PM, Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com wrote: This kind of operation is not scalable, not matter what you do, at least if you _really_ want to do that. However, if what you're looking for is not to really compute all distances, (for instance if you're looking only for the top K nearest points), then it can be highly improved. It all depends of what you want to do eventually. Guillaume val locations = filelines.map(line = line.split(\t)).map(t = (t(5).toLong, (t(2).toDouble, t(3).toDouble))).distinct().collect() val cartesienProduct=locations.cartesian(locations).map(t= Edge(t._1._1,t._2._1,distanceAmongPoints(t._1._2._1,t._1._2._2,t._2._2._1,t._2._2._2))) Code executes perfectly fine uptill here but when i try to use cartesienProduct it got stuck i.e. val count =cartesienProduct.count() Any help to efficiently do this will be highly appreciated. -- View this message in context:http://apache-spark-user-list.1001560.n3.nabble.com/Incremently-load-big-RDD-file-into-Memory-tp22410.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail:user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail:user-h...@spark.apache.org mailto:user-h...@spark.apache.org -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- Regards, Muhammad Aamir /CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content./ -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Spark-submit and multiple files
Hi Davies, I am already using --py-files. The system does use the other file. The error I am getting is not trivial. Please check the error log. On Thu, Mar 19, 2015 at 8:03 PM, Davies Liu dav...@databricks.com wrote: You could submit additional Python source via --py-files , for example: $ bin/spark-submit --py-files work.py main.py On Tue, Mar 17, 2015 at 3:29 AM, poiuytrez guilla...@databerries.com wrote: Hello guys, I am having a hard time to understand how spark-submit behave with multiple files. I have created two code snippets. Each code snippet is composed of a main.py and work.py. The code works if I paste work.py then main.py in a pyspark shell. However both snippets do not work when using spark submit and generate different errors. Function add_1 definition outside http://www.codeshare.io/4ao8B https://justpaste.it/jzvj Embedded add_1 function definition http://www.codeshare.io/OQJxq https://justpaste.it/jzvn I am trying a way to make it work. Thank you for your support. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-and-multiple-files-tp22097.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error when using multiple python files spark-submit
I see. I will try the other way around. On Thu, Mar 19, 2015 at 8:06 PM, Davies Liu dav...@databricks.com wrote: the options of spark-submit should come before main.py, or they will become the options of main.py, so it should be: ../hadoop/spark-install/bin/spark-submit --py-files /home/poiuytrez/naive.py,/home/poiuytrez/processing.py,/home/poiuytrez/settings.py --master spark://spark-m:7077 main.py On Mon, Mar 16, 2015 at 4:11 AM, poiuytrez guilla...@databerries.com wrote: I have a spark app which is composed of multiple files. When I launch Spark using: ../hadoop/spark-install/bin/spark-submit main.py --py-files /home/poiuytrez/naive.py,/home/poiuytrez/processing.py,/home/poiuytrez/settings.py --master spark://spark-m:7077 I am getting an error: 15/03/13 15:54:24 INFO TaskSetManager: Lost task 6.3 in stage 413.0 (TID 5817) on executor spark-w-3.c.databerries.internal: org.apache.spark.api.python.PythonException (Traceback (most recent call last): File /home/hadoop/spark-install/python/pyspark/worker.py, line 90, in main command = pickleSer._read_with_length(infile) File /home/hadoop/spark-install/python/pyspark/serializers.py, line 151, in _read_with_length return self.loads(obj) File /home/hadoop/spark-install/python/pyspark/serializers.py, line 396, in loads return cPickle.loads(obj) ImportError: No module named naive It is weird because I do not serialize anything. naive.py is also available on every machine at the same path. Any insight on what could be going on? The issue does not happen on my laptop. PS : I am using Spark 1.2.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-using-multiple-python-files-spark-submit-tp22080.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Speed Benchmark
Sorry for the confusion. All are running Hadoop services. Node 1 is the namenode whereas Nodes 2 and 3 are datanodes. Best, Guillaume Guy * +1 919 - 972 - 8750* On Sat, Feb 28, 2015 at 1:09 AM, Sean Owen so...@cloudera.com wrote: Is machine 1 the only one running an HDFS data node? You describe it as one running Hadoop services. On Feb 27, 2015 9:44 PM, Guillaume Guy guillaume.c@gmail.com wrote: Hi Jason: Thanks for your feedback. Beside the information above I mentioned, there are 3 machines in the cluster. *1st one*: Driver + has a bunch of Hadoop services. 32GB of RAM, 8 cores (2 used) *2nd + 3rd: *16B of RAM, 4 cores (2 used each) I hope this helps clarify. Thx. GG Best, Guillaume Guy * +1 919 - 972 - 8750 %2B1%20919%20-%20972%20-%208750* On Fri, Feb 27, 2015 at 9:06 AM, Jason Bell jaseb...@gmail.com wrote: How many machines are on the cluster? And what is the configuration of those machines (Cores/RAM)? Small cluster is very subjective statement. Guillaume Guy wrote: Dear Spark users: I want to see if anyone has an idea of the performance for a small cluster.
Speed Benchmark
Dear Spark users: I want to see if anyone has an idea of the performance for a small cluster. Reading from HDFS, what should be the performance of a count() operation on an 10GB RDD with 100M rows using pyspark. I looked into the CPU usage, all 6 are at 100%. Details: - master yarn-client - num-executors 3 - executor-cores 2 - driver-memory 5g - executor-memory 2g - Distribution: Cloudera I also attached the screenshot. Right now, I'm at 17 minutes which seems quite slow. Any idea how a decent performance with similar configuration? If it's way off, I would appreciate any pointers as to ways to improve performance. Thanks. Best, Guillaume - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Speed Benchmark
Hi Jason: Thanks for your feedback. Beside the information above I mentioned, there are 3 machines in the cluster. *1st one*: Driver + has a bunch of Hadoop services. 32GB of RAM, 8 cores (2 used) *2nd + 3rd: *16B of RAM, 4 cores (2 used each) I hope this helps clarify. Thx. GG Best, Guillaume Guy * +1 919 - 972 - 8750* On Fri, Feb 27, 2015 at 9:06 AM, Jason Bell jaseb...@gmail.com wrote: How many machines are on the cluster? And what is the configuration of those machines (Cores/RAM)? Small cluster is very subjective statement. Guillaume Guy wrote: Dear Spark users: I want to see if anyone has an idea of the performance for a small cluster.
Re: Speed Benchmark
Hi Sean: Thanks for your feedback. Scala is much faster. The count is performed in ~1 minutes (vs 17min). I would expect scala to be 2-5X faster but this gap seems to be more than that. Is that also your conclusion? Thanks. Best, Guillaume Guy * +1 919 - 972 - 8750* On Fri, Feb 27, 2015 at 9:12 AM, Sean Owen so...@cloudera.com wrote: That's very slow, and there are a lot of possible explanations. The first one that comes to mind is: I assume your YARN and HDFS are on the same machines, but are you running executors on all HDFS nodes when you run this? if not, a lot of these reads could be remote. You have 6 executor slots, but your data exists in 96 blocks on HDFS. You could read with up to 96-way parallelism. You say you're CPU-bound though, but normally I'd wonder if this was simply a case of under-using parallelism. I also wonder if the bottleneck is something to do with pyspark in this case; might be good to just try it in the spark-shell to check. On Fri, Feb 27, 2015 at 2:00 PM, Guillaume Guy guillaume.c@gmail.com wrote: Dear Spark users: I want to see if anyone has an idea of the performance for a small cluster. Reading from HDFS, what should be the performance of a count() operation on an 10GB RDD with 100M rows using pyspark. I looked into the CPU usage, all 6 are at 100%. Details: master yarn-client num-executors 3 executor-cores 2 driver-memory 5g executor-memory 2g Distribution: Cloudera I also attached the screenshot. Right now, I'm at 17 minutes which seems quite slow. Any idea how a decent performance with similar configuration? If it's way off, I would appreciate any pointers as to ways to improve performance. Thanks. Best, Guillaume - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Speed Benchmark
It is a simple text file. I'm not using SQL. just doing a rdd.count() on it. Does the bug affect it? On Friday, February 27, 2015, Davies Liu dav...@databricks.com wrote: What is this dataset? text file or parquet file? There is an issue with serialization in Spark SQL, which will make it very slow, see https://issues.apache.org/jira/browse/SPARK-6055, will be fixed very soon. Davies On Fri, Feb 27, 2015 at 1:59 PM, Guillaume Guy guillaume.c@gmail.com javascript:; wrote: Hi Sean: Thanks for your feedback. Scala is much faster. The count is performed in ~1 minutes (vs 17min). I would expect scala to be 2-5X faster but this gap seems to be more than that. Is that also your conclusion? Thanks. Best, Guillaume Guy +1 919 - 972 - 8750 On Fri, Feb 27, 2015 at 9:12 AM, Sean Owen so...@cloudera.com javascript:; wrote: That's very slow, and there are a lot of possible explanations. The first one that comes to mind is: I assume your YARN and HDFS are on the same machines, but are you running executors on all HDFS nodes when you run this? if not, a lot of these reads could be remote. You have 6 executor slots, but your data exists in 96 blocks on HDFS. You could read with up to 96-way parallelism. You say you're CPU-bound though, but normally I'd wonder if this was simply a case of under-using parallelism. I also wonder if the bottleneck is something to do with pyspark in this case; might be good to just try it in the spark-shell to check. On Fri, Feb 27, 2015 at 2:00 PM, Guillaume Guy guillaume.c@gmail.com javascript:; wrote: Dear Spark users: I want to see if anyone has an idea of the performance for a small cluster. Reading from HDFS, what should be the performance of a count() operation on an 10GB RDD with 100M rows using pyspark. I looked into the CPU usage, all 6 are at 100%. Details: master yarn-client num-executors 3 executor-cores 2 driver-memory 5g executor-memory 2g Distribution: Cloudera I also attached the screenshot. Right now, I'm at 17 minutes which seems quite slow. Any idea how a decent performance with similar configuration? If it's way off, I would appreciate any pointers as to ways to improve performance. Thanks. Best, Guillaume - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:; -- Best, Guillaume Guy * +1 919 - 972 - 8750*
Re: Spark can't pickle class: error cannot lookup attribute
Thanks Davies and Eric. I followed Davies' instructions and it works wonderful. I would add that you can also add these scripts in the pyspark shell too: pyspark --py-files support.py where support.py is your script containing your class as Davies described. Best, Guillaume Guy * +1 919 - 972 - 8750* On Wed, Feb 18, 2015 at 11:48 PM, Davies Liu dav...@databricks.com wrote: Currently, PySpark can not support pickle a class object in current script ( '__main__'), the workaround could be put the implementation of the class into a separate module, then use bin/spark-submit --py-files xxx.py in deploy it. in xxx.py: class test(object): def __init__(self, a, b): self.total = a + b in job.py: from xxx import test a = sc.parallelize([(True,False),(False,False)]) a.map(lambda (x,y): test(x,y)) run it by: bin/spark-submit --py-files xxx.py job.py On Wed, Feb 18, 2015 at 1:48 PM, Guillaume Guy guillaume.c@gmail.com wrote: Hi, This is a duplicate of the stack-overflow question here. I hope to generate more interest on this mailing list. The problem: I am running into some attribute lookup problems when trying to initiate a class within my RDD. My workflow is quite standard: 1- Start with an RDD 2- Take each element of the RDD, initiate an object for each 3- Reduce (I will write a method that will define the reduce operation later on) Here is #2: class test(object): def __init__(self, a,b): self.total = a + b a = sc.parallelize([(True,False),(False,False)]) a.map(lambda (x,y): test(x,y)) Here is the error I get: PicklingError: Can't pickle class 'main.test' : attribute lookup main.test failed I'd like to know if there is any way around it. Please, answer with a working example to achieve the intended results (i.e. creating a RDD of objects of class tests). Thanks in advance! Related question: https://groups.google.com/forum/#!topic/edx-code/9xzRJFyQwn GG
Spark can't pickle class: error cannot lookup attribute
Hi, This is a duplicate of the stack-overflow question here http://stackoverflow.com/questions/28569374/spark-returning-pickle-error-cannot-lookup-attribute. I hope to generate more interest on this mailing list. *The problem:* I am running into some attribute lookup problems when trying to initiate a class within my RDD. My workflow is quite standard: 1- Start with an RDD 2- Take each element of the RDD, initiate an object for each 3- Reduce (I will write a method that will define the reduce operation later on) *Here is #2:* *class test(object):* *def __init__(self, a,b):* *self.total = a + b* *a = sc.parallelize([(True,False),(False,False)])* *a.map(lambda (x,y): test(x,y))* Here is the error I get: PicklingError: Can't pickle class 'main.test' : attribute lookup main.test failed I'd like to know if there is any way around it. Please, answer with a working example to achieve the intended results (i.e. creating a RDD of objects of class tests). Thanks in advance! *Related question:* - https://groups.google.com/forum/#!topic/edx-code/9xzRJFyQwn GG
Re: Mllib native netlib-java/OpenBLAS
Hi, I had the same problem, and tried to compile with mvn -Pnetlib-lgpl $ mvn -Pnetlib-lgpl -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package Unfortunately, the resulting assembly jar still lacked the netlib-system class. This command : $ jar tvf assembly/target/scala-2.10/spark-assembly-1.1.1-hadoop2.3.0.jar |grep netlib | grep Native returns nothing... (and for some reason, including the netlib-all in my shipped jar did not solve the problem either, apparently the classloader does not find the class) In Spark, the profile is defined in mllib submodule, but the -Pnetlib-lgpl seems not to be transmitted to the child from the parent pom.xml I don't know how to fix that cleanly (I just added activeByDefaulttrue/activeByDefault in mllib's pom.xml), maybe it's just a problem with my maven version (3.0.5) Guillaume I tried building Spark from the source, by downloading it and running: mvn -Pnetlib-lgpl -DskipTests clean package -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Maven profile in MLLib netlib-lgpl not working (1.1.1)
Hi Issue created https://issues.apache.org/jira/browse/SPARK-4816 Probably a maven-related question for profiles in child modules I couldn't find a clean solution, just a workaround : modify pom.xml in mllib module to force activation of netlib-lgpl module. Hope a maven expert will help. Guillaume +1 with 1.3-SNAPSHOT. On Mon, Dec 1, 2014 at 5:49 PM, agg212 alexander_galaka...@brown.edu mailto:alexander_galaka...@brown.edu wrote: Thanks for your reply, but I'm still running into issues installing/configuring the native libraries for MLlib. Here are the steps I've taken, please let me know if anything is incorrect. - Download Spark source - unzip and compile using `mvn -Pnetlib-lgpl -DskipTests clean package ` - Run `sbt/sbt publish-local` The last step fails with the following error (full stack trace is attached here: error.txt http://apache-spark-user-list.1001560.n3.nabble.com/file/n20110/error.txt ): [error] (sql/compile:compile) java.lang.AssertionError: assertion failed: List(object package$DebugNode, object package$DebugNode) Do I still have to install OPENBLAS/anything else if I build Spark from the source using the -Pnetlib-lgpl flag? Also, do I change the Spark version (from 1.1.0 to 1.2.0-SNAPSHOT) in the .sbt file for my app? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-tp19662p20110.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: java.lang.OutOfMemoryError: Requested array size exceeds VM limit
Hi, The array size you (or the serializer) tries to allocate is just too big for the JVM. No configuration can help : https://plumbr.eu/outofmemoryerror/requested-array-size-exceeds-vm-limit The only option is to split you problem further by increasing parallelism. Guillaume Hi, I’m using Spark 1.1.0 and I’m having some issues to setup memory options. I get “Requested array size exceeds VM limit” and I’m probably missing something regarding memory configuration https://spark.apache.org/docs/1.1.0/configuration.html. My server has 30G of memory and this are my current settings. ##this one seams that was deprecated export SPARK_MEM=‘25g’ ## worker memory options seams to be the memory for each worker (by default we have a worker for each core) export SPARK_WORKER_MEMORY=‘5g’ I probably need to specify some options using SPARK_DAEMON_JAVA_OPTS, but I’m not quite sure how. I have tried some different options like the following, but I still couldn’t make it right: export SPARK_DAEMON_JAVA_OPTS='-Xmx8G -XX:+UseCompressedOops' export JAVA_OPTS='-Xmx8G -XX:+UseCompressedOops' Does anyone has any idea how can I approach this? 14/10/11 13:00:16 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/10/11 13:00:16 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1566 non-empty blocks out of 1566 blocks 14/10/11 13:00:16 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 4 ms 14/10/11 13:02:06 INFO ExternalAppendOnlyMap: Thread 63 spilling in-memory map of 3925 MB to disk (1 time so far) 14/10/11 13:05:17 INFO ExternalAppendOnlyMap: Thread 63 spilling in-memory map of 3925 MB to disk (2 times so far) 14/10/11 13:09:15 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 1566) java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/10/11 13:09:15 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-2,5,main] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140 Arian -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: What does KryoException: java.lang.NegativeArraySizeException mean?
Well, reading your logs, here is what happens : You do a combineByKey (so you have a join probably somewhere), which spills on disk because it's too big. To spill on disk it serializes, and the blocks are 2GB. From a 2GB dataset, it's easy to exand to several TB Increase parallelism, make sure that your combineByKey has enough different keys, and see what happens. Guillaume Thank you, Guillaume, my dataset is not that large, it's totally ~2GB 2014-10-20 16:58 GMT+08:00 Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com: Hi, It happened to me with blocks which take more than 1 or 2 GB once serialized I think the problem was that during serialization, a Byte Array is created, and arrays in java are indexed by ints. When the serializer needs to increase the buffer size, it does so somehow, but then writing in the array leads to an error. Don't know if your problem is the same, but maybe. In general Java or Java libraries do not check for oversized arrays, which is really bad when you play with big data. Guillaume The exception drives me crazy, because it occurs randomly. I didn't know which line of my code causes this exception. I didn't even understand what KryoException: java.lang.NegativeArraySizeException means, or even implies? 14/10/20 15:59:01 WARN scheduler.TaskSetManager: Lost task 32.2 in stage 0.0 (TID 181, gs-server-1000): com.esotericsoftware.kryo.KryoException: java.lang.NegativeArraySizeException Serialization trace: value (org.apache.spark.sql.catalyst.expressions.MutableAny) values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) otherElements (org.apache.spark.util.collection.CompactBuffer) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38) com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:203) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:90) org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:89) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615
Re: Delayed hotspot optimizations in Spark
Hi Could it be due to GC ? I read it may happen if your program starts with a small heap. What are your -Xms and -Xmx values ? Print GC stats with -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps Guillaume Hello spark users and developers! I am using hdfs + spark sql + hive schema + parquet as storage format. I have lot of parquet files - one files fits one hdfs block for one day. The strange thing is very slow first query for spark sql. To reproduce situation I use only one core and I have 97sec for first time and only 13sec for all next queries. Sure I query for different data, but it has same structure and size. The situation can be reproduced after restart thrift server. Here it information about parquet files reading from worker node: Slow one: Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1560251 records from 30 columns in 11686 ms: 133.51454 rec/ms, 4005.4363 cell/ms Fast one: Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1568899 records from 1 columns in 1373 ms: 1142.6796 rec/ms, 1142.6796 cell/ms As you can see second reading is 10x times faster then first. Most of the query time spent to work with parquet file. This problem is really annoying, because most of my spark task contains just 1 sql query and data processing and to speedup my jobs I put special warmup query in from of any job. My assumption is that it is hotspot optimizations that used due first reading. Do you have any idea how to confirm/solve this performance problem? Thanks for advice! p.s. I have billion hotspot optimization showed with -XX:+PrintCompilation but can not figure out what are important and what are not. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Problem with very slow behaviour of TorrentBroadcast vs. HttpBroadcast
Hi, We've had some performance issues since switching to 1.1.0, and we finally found the origin : TorrentBroadcast seems to be very slow in our setting (and it became default with 1.1.0) The logs of a 4MB variable with TB : (15s) 14/10/01 15:47:13 INFO storage.MemoryStore: Block broadcast_84_piece1 stored as bytes in memory (estimated size 171.6 KB, free 7.2 GB) 14/10/01 15:47:13 INFO storage.BlockManagerMaster: Updated info of block broadcast_84_piece1 14/10/01 15:47:23 INFO storage.MemoryStore: ensureFreeSpace(4194304) called with curMem=1401611984, maxMem=9168696115 14/10/01 15:47:23 INFO storage.MemoryStore: Block broadcast_84_piece0 stored as bytes in memory (estimated size 4.0 MB, free 7.2 GB) 14/10/01 15:47:23 INFO storage.BlockManagerMaster: Updated info of block broadcast_84_piece0 14/10/01 15:47:23 INFO broadcast.TorrentBroadcast: Reading broadcast variable 84 took 15.202260006 s 14/10/01 15:47:23 INFO storage.MemoryStore: ensureFreeSpace(4371392) called with curMem=1405806288, maxMem=9168696115 14/10/01 15:47:23 INFO storage.MemoryStore: Block broadcast_84 stored as values in memory (estimated size 4.2 MB, free 7.2 GB) And with HttpBroadcast (0.3s): 14/10/01 16:05:58 INFO broadcast.HttpBroadcast: Started reading broadcast variable 147 14/10/01 16:05:58 INFO storage.MemoryStore: ensureFreeSpace(4369376) called with curMem=1373493232, maxMem=9168696115 14/10/01 16:05:58 INFO storage.MemoryStore: Block broadcast_147 stored as values in memory (estimated size 4.2 MB, free 7.3 GB) 14/10/01 16:05:58 INFO broadcast.HttpBroadcast: Reading broadcast variable 147 took 0.320907112 s 14/10/01 16:05:58 INFO storage.BlockManager: Found block broadcast_147 locally Since Torrent is supposed to perform much better than Http, we suspect a configuration error from our side, but are unable to pin it down. Does someone have any idea of the origin of the problem ? For now we're sticking with the HttpBroadcast workaround. Guillaume -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Kyro deserialisation error
Hi, We've got the same problem here (randomly happens) : Unable to find class: 6 4 Ú4Ú» 8 4î4Úº*Q|T4â` j4 Ǥ4ê´g8 4 ¾4Ú» 4 4Ú» pE4ʽ4ں*WsѴμˁ4ڻ4ʤ4ցbל4ڻ 4[͝4[ۦ44ڻ!~44ڻΡ4Ƈ4Pҍ4҇%Q4ɋ4ifj4w4Y4ڻ*¸4☮R4ҲR4X4ڻ 4]5ᴁX^34l[?s4ƾ4ڻ8BH4Z4@4jჴ? 4ڻ 7B4ٛ/v4ꃂE4뿁4J04릁4%44ؕ w\44 Ӓ¯ٕ4ڻ/lv4ⴁ40喴Ƴ䂁4¸C4P4ڻ _o4lbʂԛ4각 4^x4ڻ Clearly a stream corruption problem. We've been running fine (afaik) on 1.0.0 for two weeks, switch to 1.0.1 this Monday, and since, this kind of problem randomly occur. Guillaume Pitel Not sure if this helps, but it does seem to be part of a name in a Wikipedia article, and Wikipedia is the data set. So something is reading this class name from the data. http://en.wikipedia.org/wiki/Carl_Fridtjof_Rode On Thu, Jul 17, 2014 at 9:40 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Seems like there is some sort of stream corruption, causing Kryo read to read a weird class name from the stream (the name arl Fridtjof Rode in the exception cannot be a class!). Not sure how to debug this. @Patrick: Any idea? -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: build spark assign version number myself?
You can specify a custom name with the --name option. It will still contain 1.1.0-SNAPSHOT, but at least you can specify your company name. If you want to replace SNAPSHOT with your company name, you will have to edit make-distribution.sh and replace the following line: VERSION=$(mvn help:evaluate -Dexpression=project.version 2/dev/null | grep -v INFO | tail -n 1) with something like COMPANYNAME=SoullessMegaCorp VERSION=$(mvn help:evaluate -Dexpression=project.version 2/dev/null | grep -v INFO | tail -n 1 | sed -e 's/SNAPSHOT/$COMPANYNAME') and so on for other packages that use their own version scheme. On Tue, Jul 1, 2014 at 9:21 AM, majian maj...@nq.com wrote: Hi,all: I'm working to compile spark by executing './make-distribution.sh --hadoop 0.20.205.0 --tgz ', after the completion of the compilation I found that the default version number is 1.1.0-SNAPSHOT i.e. spark-1.1.0-SNAPSHOT-bin-0.20.205.tgz, who know how to assign version number myself , for example spark-1.1.0-company-bin-0.20.205.tgz . Thanks, majian
Re: build spark assign version number myself?
Sorry, there's a typo in my previous post, the line should read: VERSION=$(mvn help:evaluate -Dexpression=project.version 2/dev/null | grep -v INFO | tail -n 1 | sed -e 's/SNAPSHOT/$COMPANYNAME/g') On Tue, Jul 1, 2014 at 10:35 AM, Guillaume Ballet gbal...@gmail.com wrote: You can specify a custom name with the --name option. It will still contain 1.1.0-SNAPSHOT, but at least you can specify your company name. If you want to replace SNAPSHOT with your company name, you will have to edit make-distribution.sh and replace the following line: VERSION=$(mvn help:evaluate -Dexpression=project.version 2/dev/null | grep -v INFO | tail -n 1) with something like COMPANYNAME=SoullessMegaCorp VERSION=$(mvn help:evaluate -Dexpression=project.version 2/dev/null | grep -v INFO | tail -n 1 | sed -e 's/SNAPSHOT/$COMPANYNAME') and so on for other packages that use their own version scheme. On Tue, Jul 1, 2014 at 9:21 AM, majian maj...@nq.com wrote: Hi,all: I'm working to compile spark by executing './make-distribution.sh --hadoop 0.20.205.0 --tgz ', after the completion of the compilation I found that the default version number is 1.1.0-SNAPSHOT i.e. spark-1.1.0-SNAPSHOT-bin-0.20.205.tgz, who know how to assign version number myself , for example spark-1.1.0-company-bin-0.20.205.tgz . Thanks, majian
Re: Huge matrix
On 04/12/2014 06:35 PM, Xiaoli Li wrote: Hi Guillaume, This sounds a good idea to me. I am a newbie here. Could you further explain how will you determine which clusters to keep? According to the distance between each element with each cluster center? Yes, for each element you want to compute the neighbours of, you just have to compute its distance to each cluster center. Then you keep the closest clusters. Will you keep several clusters for each element for searching nearest neighbours? Thanks. Yes generally you will. It depends on how many neighbours you want, and how you allow for approximations in the results. Guillaume -- Guillaume PITEL, Prsident +33(0)6 25 48 86 80 eXenSa S.A.S. 41, rue Prier - 92120 Montrouge - FRANCE Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
Re: Huge matrix
Hi, I'm doing this here for multiple tens of millions of elements (and the goal is to reach multiple billions), on a relatively small cluster (7 nodes 4 cores 32GB RAM). We use multiprobe KLSH. All you have to do is run a Kmeans on your data, then compute the distance between each element with each cluster center, keep a few clusters and only look into these clusters for nearest neighbours. This method is known to perform very well and vastly speedup your computation The hardest part is to decide how many clusters to compute, and how many to keep. As a rule of thumb, I generally want 300-1 elements per cluster, and use 5-20 clusters. Guillaume I am implementing an algorithm using Spark. I have one million users. I need to compute the similarity between each pair of users using some user's attributes. For each user, I need to get top k most similar users. What is the best way to implement this? Thanks. -- Guillaume PITEL, Prsident +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53 eXenSa S.A.S. 41, rue Prier - 92120 Montrouge - FRANCE Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
Re: K-means faster on Mahout then on Spark
Maybe with MEMORY_ONLY, spark has to recompute the RDD several times because they don't fit in memory. It makes things run slower. As a general safe rule, use MEMORY_AND_DISK_SER Guillaume Pitel - Président d'eXenSa Prashant Sharma scrapco...@gmail.com a écrit : I think Mahout uses FuzzyKmeans, which is different algorithm and it is not iterative. Prashant Sharma On Tue, Mar 25, 2014 at 6:50 PM, Egor Pahomov pahomov.e...@gmail.com wrote: Hi, I'm running benchmark, which compares Mahout and SparkML. For now I have next results for k-means: Number of iterations= 10, number of elements = 1000, mahouttime= 602, spark time = 138 Number of iterations= 40, number of elements = 1000, mahouttime= 1917, spark time = 330 Number of iterations= 70, number of elements = 1000, mahouttime= 3203, spark time = 388 Number of iterations= 10, number of elements = 1, mahouttime= 1235, spark time = 2226 Number of iterations= 40, number of elements = 1, mahouttime= 2755, spark time = 6388 Number of iterations= 70, number of elements = 1, mahouttime= 4107, spark time = 10967 Number of iterations= 10, number of elements = 10, mahouttime= 7070, spark time = 25268 Time in seconds. It runs on Yarn cluster with about 40 machines. Elements for clusterization are randomly created. When I changed persistence level from Memory to Memory_and_disk, on big data spark started to work faster. What am I missing? See my benchmarking code in attachment. -- Sincerely yours Egor Pakhomov Scala Developer, Yandex
Re: Spark temp dir (spark.local.dir)
I'm not 100% sure but I think it goes like this : spark.local.dir can and should be set both on the executors and on the driver (if the driver broadcast variables, the files will be stored in this directory) the SPARK_WORKER_DIR is where the jars and the log output of the executors is placed (default $SPARK_HOME/work/) and it should be cleaned regularly In $SPARK_HOME/logs are found the logs of the workers and master Guillaume Hi, I'm confused about the -Dspark.local.dir and SPARK_WORKER_DIR(--work-dir). What's the difference? I have set -Dspark.local.dir for all my worker nodes but I'm still seeing directories being created in /tmp when the job is running. I have also tried setting -Dspark.local.dir when I run the application. Thanks! -- Guillaume PITEL, Prsident +33(0)6 25 48 86 80 eXenSa S.A.S. 41, rue Prier - 92120 Montrouge - FRANCE Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
Re: Spark temp dir (spark.local.dir)
spark.local.dir can and should be set both on the executors and on the driver (if the driver broadcast variables, the files will be stored in this directory) Do you mean the worker nodes? No, only the driver broadcasts I think. Don’t think they are jetty connectors and the directories are empty: /tmp/spark-3e330cdc-7540-4313-9f32-9fa109935f17/jars /tmp/spark-3e330cdc-7540-4313-9f32-9fa109935f17/files Indeed, I must have confused that with something else. Spark local dir contains directory starting with spark-local-* , so I don't know what these files are. I run the application like this, even with the java.io.tmpdir : bin/run-example -Dspark.executor.memory=14g -Dspark.local.dir=/mnt/storage1/lm -Djava.io.tmpdir=/mnt/storage1/lm org.apache.spark.examples.SparkLR spark://oct1:7077 10 How do you pass the spark.local.dir to the workers ? in SPARK_JAVA_OPTS during SparkContext creation ? It should probably be passed in the spark-env.sh because it can differ on each node Guillaume On 13 Mar, 2014, at 5:33 pm, Guillaume Pitel guillaume.pi...@exensa.com wrote: Also, I think the jetty connector will create a small file or directory in /tmp regardless of the spark.local.dir It's very small, about 10KB Guillaume I'm not 100% sure but I think it goes like this : spark.local.dir can and should be set both on the executors and on the driver (if the driver broadcast variables, the files will be stored in this directory) the SPARK_WORKER_DIR is where the jars and the log output of the executors is placed (default $SPARK_HOME/work/) and it should be cleaned regularly In $SPARK_HOME/logs are found the logs of the workers and master Guillaume Hi, I'm confused about the -Dspark.local.dir and SPARK_WORKER_DIR(--work-dir). What's the difference? I have set -Dspark.local.dir for all my worker nodes but I'm still seeing directories being created in /tmp when the job is running. I have also tried setting -Dspark.local.dir when I run the application. Thanks! -- Mail Attachment.png Guillaume PITEL, Président +33(0)6 25 48 86 80 eXenSa S.A.S. 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05 -- exensa_logo_mail.png Guillaume PITEL, Président +33(0)6 25 48 86 80 eXenSa S.A.S. 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05