A rough estimate of the worst case memory requirement for driver is
about 2 * k * runs * numFeatures * numPartitions * 8 bytes. I put 2 at
the beginning because the previous centers are still in memory while
receiving new center updates. -Xiangrui

On Fri, Jun 19, 2015 at 9:02 AM, Rogers Jeffrey
<rogers.john2...@gmail.com> wrote:
> Thanks. Setting the driver memory property  worked for  K=1000 . But when I
> increased K to1500 I get the following error:
>
> 15/06/19 09:38:44 INFO ContextCleaner: Cleaned accumulator 7
>
> 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
> 172.31.3.51:45157 in memory (size: 1568.0 B, free: 10.4 GB)
>
> 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
> 172.31.9.50:59356 in memory (size: 1568.0 B, free: 73.6 GB)
>
> 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
> 172.31.9.50:60934 in memory (size: 1568.0 B, free: 73.6 GB)
>
> 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
> 172.31.15.51:37825 in memory (size: 1568.0 B, free: 73.6 GB)
>
> 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
> 172.31.15.51:60610 in memory (size: 1568.0 B, free: 73.6 GB)
>
> 15/06/19 09:38:44 INFO ContextCleaner: Cleaned shuffle 5
>
> Exception in thread "Thread-2" java.lang.OutOfMemoryError: Requested array
> size exceeds VM limit
>
> at java.util.Arrays.copyOf(Arrays.java:2367)
>
> at
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
>
> at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
>
> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:587)
>
> at java.lang.StringBuilder.append(StringBuilder.java:214)
>
> at py4j.Protocol.getOutputCommand(Protocol.java:305)
>
> at py4j.commands.CallCommand.execute(CallCommand.java:82)
>
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Exception in thread "Thread-300" java.lang.OutOfMemoryError: Requested array
> size exceeds VM limit
>
> at java.util.Arrays.copyOf(Arrays.java:2367)
>
> at
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
>
> at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
>
> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:587)
>
> at java.lang.StringBuilder.append(StringBuilder.java:214)
>
> at py4j.Protocol.getOutputCommand(Protocol.java:305)
>
> at py4j.commands.CallCommand.execute(CallCommand.java:82)
>
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>
>
> Is there any method/guideline through which I can understand the memory
> requirement before hand and make appropriate configurations?
>
> Regards,
> Rogers Jeffrey L
>
> On Thu, Jun 18, 2015 at 8:14 PM, Rogers Jeffrey <rogers.john2...@gmail.com>
> wrote:
>>
>> I am submitting the application from a python notebook. I am launching
>> pyspark as follows:
>>
>> SPARK_PUBLIC_DNS=ec2-54-165-202-17.compute-1.amazonaws.com
>> SPARK_WORKER_CORES=8 SPARK_WORKER_MEMORY=15g SPARK_MEM=30g OUR_JAVA_MEM=30g
>> SPARK_DAEMON_JAVA_OPTS="-XX:MaxPermSize=30g -Xms30g -Xmx30g" IPYTHON=1
>> PYSPARK_PYTHON=/usr/bin/python SPARK_PRINT_LAUNCH_COMMAND=1
>> ./spark/bin/pyspark --master
>> spark://54.165.202.17.compute-1.amazonaws.com:7077   --deploy-mode client
>>
>> I guess I should be adding another extra argument --conf
>> "spark.driver.memory=15g" . Is that correct?
>>
>> Regards,
>> Rogers Jeffrey L
>>
>> On Thu, Jun 18, 2015 at 7:50 PM, Xiangrui Meng <men...@gmail.com> wrote:
>>>
>>> With 80,000 features and 1000 clusters, you need 80,000,000 doubles to
>>> store the cluster centers. That is ~600MB. If there are 10 partitions,
>>> you might need 6GB on the driver to collect updates from workers. I
>>> guess the driver died. Did you specify driver memory with
>>> spark-submit? -Xiangrui
>>>
>>> On Thu, Jun 18, 2015 at 12:22 PM, Rogers Jeffrey
>>> <rogers.john2...@gmail.com> wrote:
>>> > Hi All,
>>> >
>>> > I am trying to run KMeans clustering on a large data set with 12,000
>>> > points
>>> > and 80,000 dimensions.  I have a spark cluster in Ec2 stand alone mode
>>> > with
>>> > 8  workers running on 2 slaves with 160 GB Ram and 40 VCPU.
>>> >
>>> > My Code is as Follows:
>>> >
>>> > def convert_into_sparse_vector(A):
>>> >     non_nan_indices=np.nonzero(~np.isnan(A) )
>>> >     non_nan_values=A[non_nan_indices]
>>> >     dictionary=dict(zip(non_nan_indices[0],non_nan_values))
>>> >     return Vectors.sparse (len(A),dictionary)
>>> >
>>> > X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ]
>>> > sc=SparkContext(appName="parallel_kmeans")
>>> > data=sc.parallelize(X,10)
>>> > model = KMeans.train(data, 1000, initializationMode="k-means||")
>>> >
>>> > where complete_dataframe is a pandas data frame that has my data.
>>> >
>>> > I get the error: Py4JNetworkError: An error occurred while trying to
>>> > connect
>>> > to the Java server.
>>> >
>>> > The error  trace is as follows:
>>> >
>>> >> ---------------------------------------- Exception happened during
>>> >> processing of request from ('127.0.0.1', 41360) Traceback (most recent
>>> >> call last):   File "/usr/lib64/python2.6/SocketServer.py", line 283,
>>> >> in _handle_request_noblock
>>> >>     self.process_request(request, client_address)   File
>>> >> "/usr/lib64/python2.6/SocketServer.py", line 309, in process_request
>>> >>     self.finish_request(request, client_address)   File
>>> >> "/usr/lib64/python2.6/SocketServer.py", line 322, in finish_request
>>> >>     self.RequestHandlerClass(request, client_address, self)   File
>>> >> "/usr/lib64/python2.6/SocketServer.py", line 617, in __init__
>>> >>     self.handle()   File "/root/spark/python/pyspark/accumulators.py",
>>> >> line 235, in handle
>>> >>     num_updates = read_int(self.rfile)   File
>>> >> "/root/spark/python/pyspark/serializers.py", line 544, in read_int
>>> >>     raise EOFError EOFError
>>> >> ----------------------------------------
>>> >>
>>> >>
>>> >> ---------------------------------------------------------------------------
>>> >> Py4JNetworkError                          Traceback (most recent call
>>> >> last) <ipython-input-13-3dd00c2c5e93> in <module>()
>>> >> ----> 1 model = KMeans.train(data, 1000,
>>> >> initializationMode="k-means||")
>>> >>
>>> >> /root/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k,
>>> >> maxIterations, runs, initializationMode, seed, initializationSteps,
>>> >> epsilon)
>>> >>     134         """Train a k-means clustering model."""
>>> >>     135         model = callMLlibFunc("trainKMeansModel",
>>> >> rdd.map(_convert_to_vector), k, maxIterations,
>>> >> --> 136                               runs, initializationMode, seed,
>>> >> initializationSteps, epsilon)
>>> >>     137         centers = callJavaFunc(rdd.context,
>>> >> model.clusterCenters)
>>> >>     138         return KMeansModel([c.toArray() for c in centers])
>>> >>
>>> >> /root/spark/python/pyspark/mllib/common.pyc in callMLlibFunc(name,
>>> >> *args)
>>> >>     126     sc = SparkContext._active_spark_context
>>> >>     127     api = getattr(sc._jvm.PythonMLLibAPI(), name)
>>> >> --> 128     return callJavaFunc(sc, api, *args)
>>> >>     129
>>> >>     130
>>> >>
>>> >> /root/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func,
>>> >> *args)
>>> >>     119     """ Call Java Function """
>>> >>     120     args = [_py2java(sc, a) for a in args]
>>> >> --> 121     return _java2py(sc, func(*args))
>>> >>     122
>>> >>     123
>>> >>
>>> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
>>> >> __call__(self, *args)
>>> >>     534             END_COMMAND_PART
>>> >>     535
>>> >> --> 536         answer = self.gateway_client.send_command(command)
>>> >>     537         return_value = get_return_value(answer,
>>> >> self.gateway_client,
>>> >>     538                 self.target_id, self.name)
>>> >>
>>> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
>>> >> send_command(self, command, retry)
>>> >>     367             if retry:
>>> >>     368                 #print_exc()
>>> >> --> 369                 response = self.send_command(command)
>>> >>     370             else:
>>> >>     371                 response = ERROR
>>> >>
>>> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
>>> >> send_command(self, command, retry)
>>> >>     360          the Py4J protocol.
>>> >>     361         """
>>> >> --> 362         connection = self._get_connection()
>>> >>     363         try:
>>> >>     364             response = connection.send_command(command)
>>> >>
>>> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
>>> >> _get_connection(self)
>>> >>     316             connection = self.deque.pop()
>>> >>     317         except Exception:
>>> >> --> 318             connection = self._create_connection()
>>> >>     319         return connection
>>> >>     320
>>> >>
>>> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
>>> >> _create_connection(self)
>>> >>     323         connection = GatewayConnection(self.address,
>>> >> self.port,
>>> >>     324                 self.auto_close, self.gateway_property)
>>> >> --> 325         connection.start()
>>> >>     326         return connection
>>> >>     327
>>> >>
>>> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
>>> >> start(self)
>>> >>     430                 'server'
>>> >>     431             logger.exception(msg)
>>> >> --> 432             raise Py4JNetworkError(msg)
>>> >>     433
>>> >>     434     def close(self):
>>> >>
>>> >> Py4JNetworkError: An error occurred while trying to connect to the
>>> >> Java server
>>> >
>>> >
>>> > Is there any specific setting that I am missing , that causes this
>>> > error?
>>> >
>>> > Thanks and Regards,
>>> > Rogers Jeffrey L
>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to