Thanks. Setting the driver memory property worked for K=1000 . But when I increased K to1500 I get the following error:
15/06/19 09:38:44 INFO ContextCleaner: Cleaned accumulator 7 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on 172.31.3.51:45157 in memory (size: 1568.0 B, free: 10.4 GB) 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on 172.31.9.50:59356 in memory (size: 1568.0 B, free: 73.6 GB) 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on 172.31.9.50:60934 in memory (size: 1568.0 B, free: 73.6 GB) 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on 172.31.15.51:37825 in memory (size: 1568.0 B, free: 73.6 GB) 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on 172.31.15.51:60610 in memory (size: 1568.0 B, free: 73.6 GB) 15/06/19 09:38:44 INFO ContextCleaner: Cleaned shuffle 5 Exception in thread "Thread-2" java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:587) at java.lang.StringBuilder.append(StringBuilder.java:214) at py4j.Protocol.getOutputCommand(Protocol.java:305) at py4j.commands.CallCommand.execute(CallCommand.java:82) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Exception in thread "Thread-300" java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:587) at java.lang.StringBuilder.append(StringBuilder.java:214) at py4j.Protocol.getOutputCommand(Protocol.java:305) at py4j.commands.CallCommand.execute(CallCommand.java:82) at py4j.GatewayConnection.run(GatewayConnection.java:207) Is there any method/guideline through which I can understand the memory requirement before hand and make appropriate configurations? Regards, Rogers Jeffrey L On Thu, Jun 18, 2015 at 8:14 PM, Rogers Jeffrey <rogers.john2...@gmail.com> wrote: > I am submitting the application from a python notebook. I am launching > pyspark as follows: > > SPARK_PUBLIC_DNS=ec2-54-165-202-17.compute-1.amazonaws.com > SPARK_WORKER_CORES=8 SPARK_WORKER_MEMORY=15g SPARK_MEM=30g OUR_JAVA_MEM=30g > SPARK_DAEMON_JAVA_OPTS="-XX:MaxPermSize=30g -Xms30g -Xmx30g" IPYTHON=1 > PYSPARK_PYTHON=/usr/bin/python SPARK_PRINT_LAUNCH_COMMAND=1 > ./spark/bin/pyspark --master spark:// > 54.165.202.17.compute-1.amazonaws.com:7077 --deploy-mode client > > I guess I should be adding another extra argument --conf > "spark.driver.memory=15g" . Is that correct? > > Regards, > Rogers Jeffrey L > > On Thu, Jun 18, 2015 at 7:50 PM, Xiangrui Meng <men...@gmail.com> wrote: > >> With 80,000 features and 1000 clusters, you need 80,000,000 doubles to >> store the cluster centers. That is ~600MB. If there are 10 partitions, >> you might need 6GB on the driver to collect updates from workers. I >> guess the driver died. Did you specify driver memory with >> spark-submit? -Xiangrui >> >> On Thu, Jun 18, 2015 at 12:22 PM, Rogers Jeffrey >> <rogers.john2...@gmail.com> wrote: >> > Hi All, >> > >> > I am trying to run KMeans clustering on a large data set with 12,000 >> points >> > and 80,000 dimensions. I have a spark cluster in Ec2 stand alone mode >> with >> > 8 workers running on 2 slaves with 160 GB Ram and 40 VCPU. >> > >> > My Code is as Follows: >> > >> > def convert_into_sparse_vector(A): >> > non_nan_indices=np.nonzero(~np.isnan(A) ) >> > non_nan_values=A[non_nan_indices] >> > dictionary=dict(zip(non_nan_indices[0],non_nan_values)) >> > return Vectors.sparse (len(A),dictionary) >> > >> > X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ] >> > sc=SparkContext(appName="parallel_kmeans") >> > data=sc.parallelize(X,10) >> > model = KMeans.train(data, 1000, initializationMode="k-means||") >> > >> > where complete_dataframe is a pandas data frame that has my data. >> > >> > I get the error: Py4JNetworkError: An error occurred while trying to >> connect >> > to the Java server. >> > >> > The error trace is as follows: >> > >> >> ---------------------------------------- Exception happened during >> >> processing of request from ('127.0.0.1', 41360) Traceback (most recent >> >> call last): File "/usr/lib64/python2.6/SocketServer.py", line 283, >> >> in _handle_request_noblock >> >> self.process_request(request, client_address) File >> >> "/usr/lib64/python2.6/SocketServer.py", line 309, in process_request >> >> self.finish_request(request, client_address) File >> >> "/usr/lib64/python2.6/SocketServer.py", line 322, in finish_request >> >> self.RequestHandlerClass(request, client_address, self) File >> >> "/usr/lib64/python2.6/SocketServer.py", line 617, in __init__ >> >> self.handle() File "/root/spark/python/pyspark/accumulators.py", >> >> line 235, in handle >> >> num_updates = read_int(self.rfile) File >> >> "/root/spark/python/pyspark/serializers.py", line 544, in read_int >> >> raise EOFError EOFError >> >> ---------------------------------------- >> >> >> >> >> --------------------------------------------------------------------------- >> >> Py4JNetworkError Traceback (most recent call >> >> last) <ipython-input-13-3dd00c2c5e93> in <module>() >> >> ----> 1 model = KMeans.train(data, 1000, >> initializationMode="k-means||") >> >> >> >> /root/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k, >> >> maxIterations, runs, initializationMode, seed, initializationSteps, >> >> epsilon) >> >> 134 """Train a k-means clustering model.""" >> >> 135 model = callMLlibFunc("trainKMeansModel", >> >> rdd.map(_convert_to_vector), k, maxIterations, >> >> --> 136 runs, initializationMode, seed, >> >> initializationSteps, epsilon) >> >> 137 centers = callJavaFunc(rdd.context, >> model.clusterCenters) >> >> 138 return KMeansModel([c.toArray() for c in centers]) >> >> >> >> /root/spark/python/pyspark/mllib/common.pyc in callMLlibFunc(name, >> >> *args) >> >> 126 sc = SparkContext._active_spark_context >> >> 127 api = getattr(sc._jvm.PythonMLLibAPI(), name) >> >> --> 128 return callJavaFunc(sc, api, *args) >> >> 129 >> >> 130 >> >> >> >> /root/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, >> >> *args) >> >> 119 """ Call Java Function """ >> >> 120 args = [_py2java(sc, a) for a in args] >> >> --> 121 return _java2py(sc, func(*args)) >> >> 122 >> >> 123 >> >> >> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in >> >> __call__(self, *args) >> >> 534 END_COMMAND_PART >> >> 535 >> >> --> 536 answer = self.gateway_client.send_command(command) >> >> 537 return_value = get_return_value(answer, >> >> self.gateway_client, >> >> 538 self.target_id, self.name) >> >> >> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in >> >> send_command(self, command, retry) >> >> 367 if retry: >> >> 368 #print_exc() >> >> --> 369 response = self.send_command(command) >> >> 370 else: >> >> 371 response = ERROR >> >> >> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in >> >> send_command(self, command, retry) >> >> 360 the Py4J protocol. >> >> 361 """ >> >> --> 362 connection = self._get_connection() >> >> 363 try: >> >> 364 response = connection.send_command(command) >> >> >> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in >> >> _get_connection(self) >> >> 316 connection = self.deque.pop() >> >> 317 except Exception: >> >> --> 318 connection = self._create_connection() >> >> 319 return connection >> >> 320 >> >> >> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in >> >> _create_connection(self) >> >> 323 connection = GatewayConnection(self.address, self.port, >> >> 324 self.auto_close, self.gateway_property) >> >> --> 325 connection.start() >> >> 326 return connection >> >> 327 >> >> >> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in >> >> start(self) >> >> 430 'server' >> >> 431 logger.exception(msg) >> >> --> 432 raise Py4JNetworkError(msg) >> >> 433 >> >> 434 def close(self): >> >> >> >> Py4JNetworkError: An error occurred while trying to connect to the >> >> Java server >> > >> > >> > Is there any specific setting that I am missing , that causes this >> error? >> > >> > Thanks and Regards, >> > Rogers Jeffrey L >> > >