Re: java server error - spark
hey, Thanks. Now it worked.. :) On Wed, Jun 15, 2016 at 6:59 PM, Jeff Zhang wrote: > Then the only solution is to increase your driver memory but still > restricted by your machine's memory. "--driver-memory" > > On Thu, Jun 16, 2016 at 9:53 AM, spR wrote: > >> Hey, >> >> But I just have one machine. I am running everything on my laptop. Won't >> I be able to do this processing in local mode then? >> >> Regards, >> Tejaswini >> >> On Wed, Jun 15, 2016 at 6:32 PM, Jeff Zhang wrote: >> >>> You are using local mode, --executor-memory won't take effect for >>> local mode, please use other cluster mode. >>> >>> On Thu, Jun 16, 2016 at 9:32 AM, Jeff Zhang wrote: >>> >>>> Specify --executor-memory in your spark-submit command. >>>> >>>> >>>> >>>> On Thu, Jun 16, 2016 at 9:01 AM, spR wrote: >>>> >>>>> Thank you. Can you pls tell How to increase the executor memory? >>>>> >>>>> >>>>> >>>>> On Wed, Jun 15, 2016 at 5:59 PM, Jeff Zhang wrote: >>>>> >>>>>> >>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>> >>>>>> >>>>>> It is OOM on the executor. Please try to increase executor memory. >>>>>> "--executor-memory" >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Jun 16, 2016 at 8:54 AM, spR wrote: >>>>>> >>>>>>> Hey, >>>>>>> >>>>>>> error trace - >>>>>>> >>>>>>> hey, >>>>>>> >>>>>>> >>>>>>> error trace - >>>>>>> >>>>>>> >>>>>>> ---Py4JJavaError >>>>>>> Traceback (most recent call >>>>>>> last) in ()> 1 temp.take(2) >>>>>>> >>>>>>> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/pyspark/sql/dataframe.pyc >>>>>>> in take(self, num)304 with SCCallSiteSync(self._sc) as >>>>>>> css:305 port = >>>>>>> self._sc._jvm.org.apache.spark.sql.execution.EvaluatePython.takeAndServe(--> >>>>>>> 306 self._jdf, num)307 return >>>>>>> list(_load_from_socket(port, BatchedSerializer(PickleSerializer( >>>>>>> 308 >>>>>>> >>>>>>> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py >>>>>>> in __call__(self, *args)811 answer = >>>>>>> self.gateway_client.send_command(command)812 return_value = >>>>>>> get_return_value(--> 813 answer, self.gateway_client, >>>>>>> self.target_id, self.name)814 >>>>>>> 815 for temp_arg in temp_args: >>>>>>> >>>>>>> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/pyspark/sql/utils.pyc >>>>>>> in deco(*a, **kw) 43 def deco(*a, **kw): 44 >>>>>>> try:---> 45 return f(*a, **kw) 46 except >>>>>>> py4j.protocol.Py4JJavaError as e: 47 s = >>>>>>> e.java_exception.toString() >>>>>>> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py >>>>>>> in get_return_value(answer, gateway_client, target_id, name)306 >>>>>>> raise Py4JJavaError(307 "An error >>>>>>> occurred while calling {0}{1}{2}.\n".--> 308 >>>>>>> format(target_id, ".", name), value)309 else: >>>>>>> 310 raise Py4JError( >>>>>>> Py4JJavaError: An error occurred while calling >>>>>>> z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe. >>>>>>> : org.apache.spark.SparkException: Job aborted due to stage failure: >>>>>>> Task 0 in stage 3.0 fai
Re: java server error - spark
Hey, But I just have one machine. I am running everything on my laptop. Won't I be able to do this processing in local mode then? Regards, Tejaswini On Wed, Jun 15, 2016 at 6:32 PM, Jeff Zhang wrote: > You are using local mode, --executor-memory won't take effect for local > mode, please use other cluster mode. > > On Thu, Jun 16, 2016 at 9:32 AM, Jeff Zhang wrote: > >> Specify --executor-memory in your spark-submit command. >> >> >> >> On Thu, Jun 16, 2016 at 9:01 AM, spR wrote: >> >>> Thank you. Can you pls tell How to increase the executor memory? >>> >>> >>> >>> On Wed, Jun 15, 2016 at 5:59 PM, Jeff Zhang wrote: >>> >>>> >>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded >>>> >>>> >>>> It is OOM on the executor. Please try to increase executor memory. >>>> "--executor-memory" >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Jun 16, 2016 at 8:54 AM, spR wrote: >>>> >>>>> Hey, >>>>> >>>>> error trace - >>>>> >>>>> hey, >>>>> >>>>> >>>>> error trace - >>>>> >>>>> >>>>> ---Py4JJavaError >>>>> Traceback (most recent call >>>>> last) in ()> 1 temp.take(2) >>>>> >>>>> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/pyspark/sql/dataframe.pyc >>>>> in take(self, num)304 with SCCallSiteSync(self._sc) as css: >>>>> 305 port = >>>>> self._sc._jvm.org.apache.spark.sql.execution.EvaluatePython.takeAndServe(--> >>>>> 306 self._jdf, num)307 return >>>>> list(_load_from_socket(port, BatchedSerializer(PickleSerializer( >>>>> 308 >>>>> >>>>> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py >>>>> in __call__(self, *args)811 answer = >>>>> self.gateway_client.send_command(command)812 return_value = >>>>> get_return_value(--> 813 answer, self.gateway_client, >>>>> self.target_id, self.name)814 >>>>> 815 for temp_arg in temp_args: >>>>> >>>>> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/pyspark/sql/utils.pyc >>>>> in deco(*a, **kw) 43 def deco(*a, **kw): 44 try:---> >>>>> 45 return f(*a, **kw) 46 except >>>>> py4j.protocol.Py4JJavaError as e: 47 s = >>>>> e.java_exception.toString() >>>>> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py >>>>> in get_return_value(answer, gateway_client, target_id, name)306 >>>>>raise Py4JJavaError(307 "An error >>>>> occurred while calling {0}{1}{2}.\n".--> 308 >>>>> format(target_id, ".", name), value)309 else: >>>>> 310 raise Py4JError( >>>>> Py4JJavaError: An error occurred while calling >>>>> z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe. >>>>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task >>>>> 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in >>>>> stage 3.0 (TID 76, localhost): java.lang.OutOfMemoryError: GC overhead >>>>> limit exceeded >>>>> at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2205) >>>>> at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1984) >>>>> at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3403) >>>>> at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470) >>>>> at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3105) >>>>> at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2336) >>>>> at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2729) >>>>> at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2549) >>>>> at >>>>> com.mysql.jdbc.PreparedStatement.execut
Re: java server error - spark
hey, I did this in my notebook. But still I get the same error. Is this the right way to do it? from pyspark import SparkConf conf = (SparkConf() .setMaster("local[4]") .setAppName("My app") .set("spark.executor.memory", "12g")) sc.conf = conf On Wed, Jun 15, 2016 at 5:59 PM, Jeff Zhang wrote: > >>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded > > > It is OOM on the executor. Please try to increase executor memory. > "--executor-memory" > > > > > > On Thu, Jun 16, 2016 at 8:54 AM, spR wrote: > >> Hey, >> >> error trace - >> >> hey, >> >> >> error trace - >> >> >> ---Py4JJavaError >> Traceback (most recent call >> last) in ()> 1 temp.take(2) >> >> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/pyspark/sql/dataframe.pyc >> in take(self, num)304 with SCCallSiteSync(self._sc) as css: >> 305 port = >> self._sc._jvm.org.apache.spark.sql.execution.EvaluatePython.takeAndServe(--> >> 306 self._jdf, num)307 return >> list(_load_from_socket(port, BatchedSerializer(PickleSerializer( >> 308 >> >> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py >> in __call__(self, *args)811 answer = >> self.gateway_client.send_command(command)812 return_value = >> get_return_value(--> 813 answer, self.gateway_client, >> self.target_id, self.name)814 >> 815 for temp_arg in temp_args: >> >> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/pyspark/sql/utils.pyc >> in deco(*a, **kw) 43 def deco(*a, **kw): 44 try:---> 45 >>return f(*a, **kw) 46 except >> py4j.protocol.Py4JJavaError as e: 47 s = >> e.java_exception.toString() >> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py >> in get_return_value(answer, gateway_client, target_id, name)306 >> raise Py4JJavaError(307 "An error occurred >> while calling {0}{1}{2}.\n".--> 308 format(target_id, >> ".", name), value)309 else: >> 310 raise Py4JError( >> Py4JJavaError: An error occurred while calling >> z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe. >> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 >> in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 >> (TID 76, localhost): java.lang.OutOfMemoryError: GC overhead limit exceeded >> at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2205) >> at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1984) >> at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3403) >> at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470) >> at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3105) >> at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2336) >> at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2729) >> at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2549) >> at >> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861) >> at >> com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1962) >> at >> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.(JDBCRDD.scala:363) >> at >> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:339) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> at org.apache.spark.executor.Executor$TaskRunner
Re: java server error - spark
Thank you. Can you pls tell How to increase the executor memory? On Wed, Jun 15, 2016 at 5:59 PM, Jeff Zhang wrote: > >>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded > > > It is OOM on the executor. Please try to increase executor memory. > "--executor-memory" > > > > > > On Thu, Jun 16, 2016 at 8:54 AM, spR wrote: > >> Hey, >> >> error trace - >> >> hey, >> >> >> error trace - >> >> >> ---Py4JJavaError >> Traceback (most recent call >> last) in ()> 1 temp.take(2) >> >> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/pyspark/sql/dataframe.pyc >> in take(self, num)304 with SCCallSiteSync(self._sc) as css: >> 305 port = >> self._sc._jvm.org.apache.spark.sql.execution.EvaluatePython.takeAndServe(--> >> 306 self._jdf, num)307 return >> list(_load_from_socket(port, BatchedSerializer(PickleSerializer( >> 308 >> >> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py >> in __call__(self, *args)811 answer = >> self.gateway_client.send_command(command)812 return_value = >> get_return_value(--> 813 answer, self.gateway_client, >> self.target_id, self.name)814 >> 815 for temp_arg in temp_args: >> >> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/pyspark/sql/utils.pyc >> in deco(*a, **kw) 43 def deco(*a, **kw): 44 try:---> 45 >>return f(*a, **kw) 46 except >> py4j.protocol.Py4JJavaError as e: 47 s = >> e.java_exception.toString() >> /Users/my/Documents/My_Study_folder/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py >> in get_return_value(answer, gateway_client, target_id, name)306 >> raise Py4JJavaError(307 "An error occurred >> while calling {0}{1}{2}.\n".--> 308 format(target_id, >> ".", name), value)309 else: >> 310 raise Py4JError( >> Py4JJavaError: An error occurred while calling >> z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe. >> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 >> in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 >> (TID 76, localhost): java.lang.OutOfMemoryError: GC overhead limit exceeded >> at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2205) >> at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1984) >> at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3403) >> at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470) >> at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3105) >> at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2336) >> at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2729) >> at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2549) >> at >> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861) >> at >> com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1962) >> at >> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.(JDBCRDD.scala:363) >> at >> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:339) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >
Re: java server error - spark
Failed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply$mcI$sp(python.scala:126) at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply(python.scala:124) at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply(python.scala:124) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086) at org.apache.spark.sql.execution.EvaluatePython$.takeAndServe(python.scala:124) at org.apache.spark.sql.execution.EvaluatePython.takeAndServe(python.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2205) at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1984) at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3403) at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470) at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3105) at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2336) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2729) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2549) at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861) at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1962) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.(JDBCRDD.scala:363) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:339) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more On Wed, Jun 15, 2016 at 5:39 PM, Jeff Zhang wrote: > Could you paste the full stacktrace ? > > On Thu, Jun 16, 2016 at 7:24 AM, spR wrote: > >> Hi, >> I am getting this error while executing a query using sqlcontext.sql >> >> The table has around 2.5 gb of data to be scanned. >> >> First I get out of memory exception. But I have 16 gb of ram >> >> Then my notebook dies and I get below error >> >> Py4JNetworkError: An error occurred while trying to connect to the Java >> server >> >> >> Thank You >> > > > > -- > Best Regards > > Jeff Zhang >
java server error - spark
Hi, I am getting this error while executing a query using sqlcontext.sql The table has around 2.5 gb of data to be scanned. First I get out of memory exception. But I have 16 gb of ram Then my notebook dies and I get below error Py4JNetworkError: An error occurred while trying to connect to the Java server Thank You
Re: concat spark dataframes
Hey, There are quite a lot of fields. But, there are no common fields between the 2 dataframes. Can I not concatenate the 2 frames like we can do in pandas such that the resulting dataframe has columns from both the dataframes? Thank you. Regards, Misha On Wed, Jun 15, 2016 at 3:44 PM, Mohammed Guller wrote: > Hi Misha, > > What is the schema for both the DataFrames? And what is the expected > schema of the resulting DataFrame? > > > > Mohammed > > Author: Big Data Analytics with Spark > <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> > > > > *From:* Natu Lauchande [mailto:nlaucha...@gmail.com] > *Sent:* Wednesday, June 15, 2016 2:07 PM > *To:* spR > *Cc:* user > *Subject:* Re: concat spark dataframes > > > > Hi, > > You can select the common collumns and use DataFrame.union all . > > Regards, > > Natu > > > > On Wed, Jun 15, 2016 at 8:57 PM, spR wrote: > > hi, > > > > how to concatenate spark dataframes? I have 2 frames with certain columns. > I want to get a dataframe with columns from both the other frames. > > > > Regards, > > Misha > > >
data too long
I am trying to save a spark dataframe in the mysql database by using: df.write(sql_url, table='db.table') the first column in the dataframe seems too long and I get this error : Data too long for column 'custid' at row 1 what should I do? Thanks
concat spark dataframes
hi, how to concatenate spark dataframes? I have 2 frames with certain columns. I want to get a dataframe with columns from both the other frames. Regards, Misha
Re: processing 50 gb data using just one machine
Thanks! got that. I was worried about the time itself. On Wed, Jun 15, 2016 at 10:10 AM, Sergio Fernández wrote: > In theory yes... the common sense say that: > > volume / resources = time > > So more volume on the same processing resources would just take more time. > On Jun 15, 2016 6:43 PM, "spR" wrote: > >> I have 16 gb ram, i7 >> >> Will this config be able to handle the processing without my ipythin >> notebook dying? >> >> The local mode is for testing purpose. But, I do not have any cluster at >> my disposal. So can I make this work with the configuration that I have? >> Thank you. >> On Jun 15, 2016 9:40 AM, "Deepak Goel" wrote: >> >>> What do you mean by "EFFECIENTLY"? >>> >>> Hey >>> >>> Namaskara~Nalama~Guten Tag~Bonjour >>> >>> >>>-- >>> Keigu >>> >>> Deepak >>> 73500 12833 >>> www.simtree.net, dee...@simtree.net >>> deic...@gmail.com >>> >>> LinkedIn: www.linkedin.com/in/deicool >>> Skype: thumsupdeicool >>> Google talk: deicool >>> Blog: http://loveandfearless.wordpress.com >>> Facebook: http://www.facebook.com/deicool >>> >>> "Contribute to the world, environment and more : >>> http://www.gridrepublic.org >>> " >>> >>> On Wed, Jun 15, 2016 at 9:33 PM, spR wrote: >>> >>>> Hi, >>>> >>>> can I use spark in local mode using 4 cores to process 50gb data >>>> effeciently? >>>> >>>> Thank you >>>> >>>> misha >>>> >>> >>>
Re: processing 50 gb data using just one machine
I meant local mode is testing purpose generally. But, I have to use the entire 50gb data. On Wed, Jun 15, 2016 at 10:14 AM, Deepak Goel wrote: > If it is just for test purpose, why not use a smaller size of data and > test it on your notebook. When you go for the cluster, you can go for 50GB > (I am a newbie so my thought would be very naive) > > Hey > > Namaskara~Nalama~Guten Tag~Bonjour > > >-- > Keigu > > Deepak > 73500 12833 > www.simtree.net, dee...@simtree.net > deic...@gmail.com > > LinkedIn: www.linkedin.com/in/deicool > Skype: thumsupdeicool > Google talk: deicool > Blog: http://loveandfearless.wordpress.com > Facebook: http://www.facebook.com/deicool > > "Contribute to the world, environment and more : > http://www.gridrepublic.org > " > > On Wed, Jun 15, 2016 at 10:40 PM, Sergio Fernández > wrote: > >> In theory yes... the common sense say that: >> >> volume / resources = time >> >> So more volume on the same processing resources would just take more time. >> On Jun 15, 2016 6:43 PM, "spR" wrote: >> >>> I have 16 gb ram, i7 >>> >>> Will this config be able to handle the processing without my ipythin >>> notebook dying? >>> >>> The local mode is for testing purpose. But, I do not have any cluster at >>> my disposal. So can I make this work with the configuration that I have? >>> Thank you. >>> On Jun 15, 2016 9:40 AM, "Deepak Goel" wrote: >>> >>>> What do you mean by "EFFECIENTLY"? >>>> >>>> Hey >>>> >>>> Namaskara~Nalama~Guten Tag~Bonjour >>>> >>>> >>>>-- >>>> Keigu >>>> >>>> Deepak >>>> 73500 12833 >>>> www.simtree.net, dee...@simtree.net >>>> deic...@gmail.com >>>> >>>> LinkedIn: www.linkedin.com/in/deicool >>>> Skype: thumsupdeicool >>>> Google talk: deicool >>>> Blog: http://loveandfearless.wordpress.com >>>> Facebook: http://www.facebook.com/deicool >>>> >>>> "Contribute to the world, environment and more : >>>> http://www.gridrepublic.org >>>> " >>>> >>>> On Wed, Jun 15, 2016 at 9:33 PM, spR wrote: >>>> >>>>> Hi, >>>>> >>>>> can I use spark in local mode using 4 cores to process 50gb data >>>>> effeciently? >>>>> >>>>> Thank you >>>>> >>>>> misha >>>>> >>>> >>>> >
Re: processing 50 gb data using just one machine
I have 16 gb ram, i7 Will this config be able to handle the processing without my ipythin notebook dying? The local mode is for testing purpose. But, I do not have any cluster at my disposal. So can I make this work with the configuration that I have? Thank you. On Jun 15, 2016 9:40 AM, "Deepak Goel" wrote: > What do you mean by "EFFECIENTLY"? > > Hey > > Namaskara~Nalama~Guten Tag~Bonjour > > >-- > Keigu > > Deepak > 73500 12833 > www.simtree.net, dee...@simtree.net > deic...@gmail.com > > LinkedIn: www.linkedin.com/in/deicool > Skype: thumsupdeicool > Google talk: deicool > Blog: http://loveandfearless.wordpress.com > Facebook: http://www.facebook.com/deicool > > "Contribute to the world, environment and more : > http://www.gridrepublic.org > " > > On Wed, Jun 15, 2016 at 9:33 PM, spR wrote: > >> Hi, >> >> can I use spark in local mode using 4 cores to process 50gb data >> effeciently? >> >> Thank you >> >> misha >> > >
update mysql in spark
hi, can we write a update query using sqlcontext? sqlContext.sql("update act1 set loc = round(loc,4)") what is wrong in this? I get the following error. Py4JJavaError: An error occurred while calling o20.sql. : java.lang.RuntimeException: [1.1] failure: ``with'' expected but identifier update found update act1 set loc = round(loc,4) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36) at org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67) at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211) at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211) at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114) at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:113) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208) at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:43) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:231) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745)
processing 50 gb data using just one machine
Hi, can I use spark in local mode using 4 cores to process 50gb data effeciently? Thank you misha
Re: representing RDF literals as vertex properties
OK, have waded into implementing this and have gotten pretty far, but am now hitting something I don't understand, an NoSuchMethodError. The code looks like [...] val conf = new SparkConf().setAppName(appName) //conf.set("fs.default.name", "file://"); val sc = new SparkContext(conf) val lines = sc.textFile(inFileArg) val foo = lines.count() val edgeTmp = lines.map( line => line.split(" ").slice(0,3)). // following filters omit comments, so no need to specifically filter for comments ("#") filter(x => x(0).startsWith("<") && x(0).endsWith(">") && x(2).startsWith("<") && x(2).endsWith(">")). map(x => Edge(hashToVId(x(0)),hashToVId(x(2)),x(1))) edgeTmp.foreach( edge => print(edge+"\n")) val edges: RDD[Edge[String]] = edgeTmp println("edges.count="+edges.count) val properties: RDD[(VertexId, Map[String, Any])] = lines.map( line => line.split(" ").slice(0,3)). filter(x => !x(0).startsWith("#")). // omit RDF comments filter(x => !x(2).startsWith("<") || !x(2).endsWith(">")). map(x => { val m: Tuple2[VertexId, Map[String, Any]] = (hashToVId(x(0)), Map((x(1).toString,x(2; m }) properties.foreach( prop => print(prop+"\n")) val G = Graph(properties, edges)/// < this is line 114 println(G) The (short) traceback looks like Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.graphx.Graph$.apply$default$4()Lorg/apache/spark/storage/StorageLevel; at com.cray.examples.spark.graphx.lubm.query9$.main(query9.scala:114) at com.cray.examples.spark.graphx.lubm.query9.main(query9.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Is the method that's not found (".../StorageLevel") something I need to initialize? Using this same code on a toy problem works fine. BTW, this is Spark 1.0, running locally on my laptop. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/representing-RDF-literals-as-vertex-properties-tp20404p20582.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
representing RDF literals as vertex properties
@ankurdave's concise code at https://gist.github.com/ankurdave/587eac4d08655d0eebf9, responding to an earlier thread (http://apache-spark-user-list.1001560.n3.nabble.com/How-to-construct-graph-in-graphx-tt16335.html#a16355) shows how to build a graph with multiple edge-types ("predicates" in RDF-speak). I'm also looking at how to represent literals as vertex properties. It seems one way to do this is via positional convention in an Array/Tuple/List that is the VD; i.e., to represent height, weight, and eyeColor, the VD could be a Tuple3(Double, Double, String). If any of the properties can be present or not, then it seems the code needs to be precise about which elements of the Array/Tuple/List are present and which are not. E.g., to assign only weight, it could be Tuple3(Option(Double), 123.4, Option(String)). Given that vertices can have many many properties, it seems memory consumption for the properties should be as parsimonious as possible. Will any of Array/Tuple/List support sparse usage? Is Option the way to get there? Is this a reasonable approach for representing vertex properties, or is there a better way? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/representing-RDF-literals-as-vertex-properties-tp20404.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to convert System.currentTimeMillis to calendar time
Apologies for what seems an egregiously simple question, but I can't find the answer anywhere. I have timestamps from the Spark Streaming Time() interface, in milliseconds since an epoch, and I want to print out a human-readable calendar date and time. How does one do that? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-convert-System-currentTimeMillis-to-calendar-time-tp18856.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: "overloaded method value updateStateByKey ... cannot be applied to ..." when Key is a Tuple2
After comparing with previous code, I got it work by making the return a Some instead of Tuple2. Perhaps some day I will understand this. spr wrote > --code > > val updateDnsCount = (values: Seq[(Int, Time)], state: Option[(Int, > Time)]) => { > val currentCount = if (values.isEmpty) 0 else values.map( x => > x._1).sum > val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else > values.map( x => x._2).min > > val (previousCount, minTime) = state.getOrElse((0, > Time(System.currentTimeMillis))) > > // (currentCount + previousCount, Seq(minTime, newMinTime).min) > <== old > Some(currentCount + previousCount, Seq(minTime, newMinTime).min) > // <== new > } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/overloaded-method-value-updateStateByKey-cannot-be-applied-to-when-Key-is-a-Tuple2-tp18644p18750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
"overloaded method value updateStateByKey ... cannot be applied to ..." when Key is a Tuple2
I am creating a workflow; I have an existing call to updateStateByKey that works fine, but when I created a second use where the key is a Tuple2, it's now failing with the dreaded "overloaded method value updateStateByKey with alternatives ... cannot be applied to ..." Comparing the two uses I'm not seeing anything that seems broken, though I do note that all the messages below describe what the code provides as Time as org.apache.spark.streaming.Time. a) Could the Time v org.apache.spark.streaming.Time difference be causing this? (I'm using Time the same in the first use, which appears to work properly.) b) Any suggestions of what else could be causing the error? --code val ssc = new StreamingContext(conf, Seconds(timeSliceArg)) ssc.checkpoint(".") var lines = ssc.textFileStream(dirArg) var linesArray = lines.map( line => (line.split("\t"))) var DnsSvr = linesArray.map( lineArray => ( (lineArray(4), lineArray(5)), (1 , Time((lineArray(0).toDouble*1000).toLong) )) ) val updateDnsCount = (values: Seq[(Int, Time)], state: Option[(Int, Time)]) => { val currentCount = if (values.isEmpty) 0 else values.map( x => x._1).sum val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else values.map( x => x._2).min val (previousCount, minTime) = state.getOrElse((0, Time(System.currentTimeMillis))) (currentCount + previousCount, Seq(minTime, newMinTime).min) } var DnsSvrCum = DnsSvr.updateStateByKey[(Int, Time)](updateDnsCount) // <=== error here --compilation output-- [error] /Users/spr/Documents/.../cyberSCinet.scala:513: overloaded method value updateStateByKey with alternatives: [error] (updateFunc: Iterator[((String, String), Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)])] => Iterator[((String, String), (Int, org.apache.spark.streaming.Time))],partitioner: org.apache.spark.Partitioner,rememberPartitioner: Boolean)(implicit evidence$5: scala.reflect.ClassTag[(Int, org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String, String), (Int, org.apache.spark.streaming.Time))] [error] (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int, org.apache.spark.streaming.Time)],partitioner: org.apache.spark.Partitioner)(implicit evidence$4: scala.reflect.ClassTag[(Int, org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String, String), (Int, org.apache.spark.streaming.Time))] [error] (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int, org.apache.spark.streaming.Time)],numPartitions: Int)(implicit evidence$3: scala.reflect.ClassTag[(Int, org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String, String), (Int, org.apache.spark.streaming.Time))] [error] (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int, org.apache.spark.streaming.Time)])(implicit evidence$2: scala.reflect.ClassTag[(Int, org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String, String), (Int, org.apache.spark.streaming.Time))] [error] cannot be applied to ((Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)]) => (Int, org.apache.spark.streaming.Time)) [error] var DnsSvrCum = DnsSvr.updateStateByKey[(Int, Time)](updateDnsCount) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/overloaded-method-value-updateStateByKey-cannot-be-applied-to-when-Key-is-a-Tuple2-tp18644.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
in function prototypes?
I am creating a workflow; I have an existing call to updateStateByKey that works fine, but when I created a second use where the key is a Tuple2, it's now failing with the dreaded "overloaded method value updateStateByKey with alternatives ... cannot be applied to ..." Comparing the two uses I'm not seeing anything that seems broken, though I do note that all the messages below describe what the code provides as Time as org.apache.spark.streaming.Time. a) Could the Time v org.apache.spark.streaming.Time difference be causing this? (I'm using Time the same in the first use, which appears to work properly.) b) Any suggestions of what else could be causing the error? --code val ssc = new StreamingContext(conf, Seconds(timeSliceArg)) ssc.checkpoint(".") var lines = ssc.textFileStream(dirArg) var linesArray = lines.map( line => (line.split("\t"))) var DnsSvr = linesArray.map( lineArray => ( (lineArray(4), lineArray(5)), (1 , Time((lineArray(0).toDouble*1000).toLong) )) ) val updateDnsCount = (values: Seq[(Int, Time)], state: Option[(Int, Time)]) => { val currentCount = if (values.isEmpty) 0 else values.map( x => x._1).sum val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else values.map( x => x._2).min val (previousCount, minTime) = state.getOrElse((0, Time(System.currentTimeMillis))) (currentCount + previousCount, Seq(minTime, newMinTime).min) } var DnsSvrCum = DnsSvr.updateStateByKey[(Int, Time)](updateDnsCount) // <=== error here --compilation output-- [error] /Users/spr/Documents/.../cyberSCinet.scala:513: overloaded method value updateStateByKey with alternatives: [error] (updateFunc: Iterator[((String, String), Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)])] => Iterator[((String, String), (Int, org.apache.spark.streaming.Time))],partitioner: org.apache.spark.Partitioner,rememberPartitioner: Boolean)(implicit evidence$5: scala.reflect.ClassTag[(Int, org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String, String), (Int, org.apache.spark.streaming.Time))] [error] (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int, org.apache.spark.streaming.Time)],partitioner: org.apache.spark.Partitioner)(implicit evidence$4: scala.reflect.ClassTag[(Int, org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String, String), (Int, org.apache.spark.streaming.Time))] [error] (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int, org.apache.spark.streaming.Time)],numPartitions: Int)(implicit evidence$3: scala.reflect.ClassTag[(Int, org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String, String), (Int, org.apache.spark.streaming.Time))] [error] (updateFunc: (Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)]) => Option[(Int, org.apache.spark.streaming.Time)])(implicit evidence$2: scala.reflect.ClassTag[(Int, org.apache.spark.streaming.Time)])org.apache.spark.streaming.dstream.DStream[((String, String), (Int, org.apache.spark.streaming.Time))] [error] cannot be applied to ((Seq[(Int, org.apache.spark.streaming.Time)], Option[(Int, org.apache.spark.streaming.Time)]) => (Int, org.apache.spark.streaming.Time)) [error] var DnsSvrCum = DnsSvr.updateStateByKey[(Int, Time)](updateDnsCount) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/in-function-prototypes-tp18642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to blend a DStream and a broadcast variable?
My use case has one large data stream (DS1) that obviously maps to a DStream. The processing of DS1 involves filtering it for any of a set of known values, which will change over time, though slowly by streaming standards. If the filter data were static, it seems to obviously map to a broadcast variable, but it's dynamic. (And I don't think it works to implement it as a DStream, because the new values need to be copied redundantly to all executors, not partitioned among the executors.) Looking at the Spark and Spark Streaming documentation, I have two questions: 1) There's no mention in the Spark Streaming Programming Guide of broadcast variables. Do they coexist properly? 2) Once I have a broadcast variable in place in the "periodic function" that Spark Streaming executes, how can I update its value? Obviously I can't literally update the value of that broadcast variable, which is immutable, but how can I get a new version of the variable established in all the executors? (And the other ever-present implicit question...) 3) Is there a better way to implement this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-blend-a-DStream-and-a-broadcast-variable-tp18227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: with SparkStreeaming spark-submit, don't see output after ssc.start()
This problem turned out to be a cockpit error. I had the same class name defined in a couple different files, and didn't realize SBT was compiling them all together, and then executing the "wrong" one. Mea culpa. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/with-SparkStreeaming-spark-submit-don-t-see-output-after-ssc-start-tp17989p18224.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming: which code is (not) executed at every batch interval?
Good, thanks for the clarification. It would be great if this were precisely stated somewhere in the docs. :) To state this another way, it seems like there's no way to straddle the streaming world and the non-streaming world; to get input from both a (vanilla, Linux) file and a stream. Is that true? If so, it seems I need to turn my (vanilla file) data into a second stream. sowen wrote > Yes, code is just local Scala code unless it's invoking Spark APIs. > The "non-Spark-streaming" block appears to just be normal program code > executed in your driver, which ultimately starts the streaming > machinery later. It executes once; there is nothing about that code > connected to Spark. It's not magic. > > To execute code against every RDD you use operations like foreachRDD > on DStream to write a function that is executed at each batch interval > on an RDD. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-which-code-is-not-executed-at-every-batch-interval-tp18071p18087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming appears not to recognize a more recent version of an already-seen file; true?
Holden Karau wrote > This is the expected behavior. Spark Streaming only reads new files once, > this is why they must be created through an atomic move so that Spark > doesn't accidentally read a partially written file. I'd recommend looking > at "Basic Sources" in the Spark Streaming guide ( > http://spark.apache.org/docs/latest/streaming-programming-guide.html ). Thanks for the quick response. OK, this does seem consistent with the rest of Spark Streaming. Looking at Basic Sources, it says "Once moved [into the directory being observed], the files must not be changed." I don't think of removing a file and creating a new one under the same name as "changing" the file, i.e., it has a different inode number. It might be more precise to say something like "Once a filename has been detected by Spark Streaming, it will be viewed as having been processed for the life of the context." This end-case also implies that any filename-generating code has to be certain it will not create repeats within the life of a context, which is not easily deduced from the existing description. Thanks again. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-appears-not-to-recognize-a-more-recent-version-of-an-already-seen-file-true-tp18074p18076.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming appears not to recognize a more recent version of an already-seen file; true?
I am trying to implement a use case that takes some human input. Putting that in a single file (as opposed to a collection of HDFS files) would be a simpler human interface, so I tried an experiment with whether Spark Streaming (via textFileStream) will recognize a new version of a filename it has already digested. (Yes, I'm deleting and moving a new file into the same name, not modifying in place.) It appears the answer is No, it does not recognize a new version. Can one of the experts confirm a) this is true and b) this is intended? Experiment: - run an existing program that works to digest new files in a directory - modify the data-creation script to put the new files always under the same name instead of different names, then run the script Outcome: it sees the first file under that name, but none of the subsequent files (with different contents, which would show up in output). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-appears-not-to-recognize-a-more-recent-version-of-an-already-seen-file-true-tp18074.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Streaming: which code is (not) executed at every batch interval?
The use case I'm working on has a main data stream in which a human needs to modify what to look for. I'm thinking to implement the main data stream with Spark Streaming and the things to look for with Spark. (Better approaches welcome.) To do this, I have intermixed Spark and Spark Streaming code, and it appears that the Spark code is not being executed every batch interval. With details elided, it looks like val sc = new SparkContext(conf) val ssc = new StreamingContext(conf, Seconds(10)) ssc.checkpoint(".") var lines = ssc.textFileStream(dirArg) // Spark Streaming code var linesArray = lines.map( line => (line.split("\t"))) val whiteFd = (new java.io.File(whiteArg)) // non-Spark-Streaming code if (whiteFd.lastModified > System.currentTimeMillis-(timeSliceArg*1000)) { // read the file into a var // Spark Streaming code var SvrCum = newState.updateStateByKey[(Int, Time, Time)](updateMyState) It appears the non-Spark-Streaming code gets executed once at program initiation but not repeatedly. So, two questions: 1) Is it correct that Spark code does not get executed per batch interval? 2) Is there a definition somewhere of what code will and will not get executed per batch interval? (I didn't find it in either the Spark or Spark Streaming programming guides.) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-which-code-is-not-executed-at-every-batch-interval-tp18071.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: with SparkStreeaming spark-submit, don't see output after ssc.start()
Yes, good catch. I also realized, after I posted, that I was calling 2 different classes, though they are in the same JAR. I went back and tried it again with the same class in both cases, and it failed the same way. I thought perhaps having 2 classes in a JAR was an issue, but commenting out one of the classes did not seem to make a difference. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/with-SparkStreeaming-spark-submit-don-t-see-output-after-ssc-start-tp17989p18066.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: with SparkStreeaming spark-submit, don't see output after ssc.start()
P.S. I believe I am creating output from the Spark Streaming app, and thus not falling into the "no-output, no-execution" pitfall, as at the end I have newServers.print() newServers.saveAsTextFiles("newServers","out") -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/with-SparkStreeaming-spark-submit-don-t-see-output-after-ssc-start-tp17989p18003.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
with SparkStreeaming spark-submit, don't see output after ssc.start()
I have a Spark Streaming program that works fine if I execute it via sbt "runMain com.cray.examples.spark.streaming.cyber.StatefulDhcpServerHisto -f /Users/spr/Documents/<...>/tmp/ -t 10" but if I start it via $S/bin/spark-submit --master local[12] --class StatefulNewDhcpServers target/scala-2.10/newd*jar -f /Users/spr/Documents/<...>/tmp/ -t 10 (where $S points to the base of the Spark installation), it prints the output of print statements before the ssc.start() but nothing after that. I might well have screwed up something, but I'm getting no output anywhere AFAICT. I have set spark.eventLog.enabled to True in my spark-defaults.conf file. The Spark History Server at localhost:18080 says "no completed applications found". There must be some log output somewhere. Any ideas? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/with-SparkStreeaming-spark-submit-don-t-see-output-after-ssc-start-tp17989.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: does updateStateByKey accept a state that is a tuple?
Based on execution on small test cases, it appears that the construction below does what I intend. (Yes, all those Tuple1()s were superfluous.) var lines = ssc.textFileStream(dirArg) var linesArray = lines.map( line => (line.split("\t"))) var newState = linesArray.map( lineArray => ((lineArray(4), (1, Time((lineArray(0).toDouble*1000).toLong), Time((lineArray(0).toDouble*1000).toLong))) )) val updateDhcpState = (newValues: Seq[(Int, Time, Time)], state: Option[(Int, Time, Time)]) => Option[(Int, Time, Time)] { val newCount = newValues.map( x => x._1).sum val newMinTime = newValues.map( x => x._2).min val newMaxTime = newValues.map( x => x._3).max val (count, minTime, maxTime) = state.getOrElse((0, Time(Int.MaxValue), Time(Int.MinValue))) (count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime, newMaxTime).max) } var DhcpSvrCum = newState.updateStateByKey[(Int, Time, Time)](updateDhcpState) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/does-updateStateByKey-accept-a-state-that-is-a-tuple-tp17756p17828.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: does updateStateByKey accept a state that is a tuple?
I think I understand how to deal with this, though I don't have all the code working yet. The point is that the V of (K, V) can itself be a tuple. So the updateFunc prototype looks something like val updateDhcpState = (newValues: Seq[Tuple1[(Int, Time, Time)]], state: Option[Tuple1[(Int, Time, Time)]]) => Option[Tuple1[(Int, Time, Time)]] { ... } And I'm wondering whether those Tuple1()s are superfluous. Film at 11. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/does-updateStateByKey-accept-a-state-that-is-a-tuple-tp17756p17769.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
does updateStateByKey accept a state that is a tuple?
I'm trying to implement a Spark Streaming program to calculate the number of instances of a given key encountered and the minimum and maximum times at which it was encountered. updateStateByKey seems to be just the thing, but when I define the "state" to be a tuple, I get compile errors I'm not finding a way around. Perhaps it's something simple, but I'm stumped. var lines = ssc.textFileStream(dirArg) var linesArray = lines.map( line => (line.split("\t"))) var newState = linesArray.map( lineArray => (lineArray(4), 1, Time((lineArray(0).toDouble*1000).toInt), Time((lineArray(0).toDouble*1000).toInt))) val updateDhcpState = (newValues: Seq[(Int, Time, Time)], state: Option[(Int, Time, Time)]) => { val newCount = newValues.map( x => x._1).sum val newMinTime = newValues.map( x => x._2).min val newMaxTime = newValues.map( x => x._3).max val (count, minTime, maxTime) = state.getOrElse((0, Time(Int.MaxValue), Time(Int.MinValue))) Some((count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime, newMaxTime).max)) //(count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime, newMaxTime).max) } var DhcpSvrCum = newState.updateStateByKey[(Int, Time, Time)](updateDhcpState) The error I get is [info] Compiling 3 Scala sources to /Users/spr/Documents/.../target/scala-2.10/classes... [error] /Users/spr/Documents/...StatefulDhcpServersHisto.scala:95: value updateStateByKey is not a member of org.apache.spark.streaming.dstream.DStream[(String, Int, org.apache.spark.streaming.Time, org.apache.spark.streaming.Time)] [error] var DhcpSvrCum = newState.updateStateByKey[(Int, Time, Time)](updateDhcpState) I don't understand why the String is being prepended to the tuple I expect (Int, Time, Time). In the main example (StatefulNetworkWordCount, here <https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala> ), the data is a stream of (String, Int) tuples created by val wordDstream = words.map(x => (x, 1)) and the updateFunc ignores the String key in its definition val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } Is there some special-casing of a key with a simple (non-tuple) value? How could this work with a tuple value? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/does-updateStateByKey-accept-a-state-that-is-a-tuple-tp17756.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: what does DStream.union() do?
I need more precision to understand. If the elements of one DStream/RDD are (String) and the elements of the other are (Time, Int), what does "union" mean? I'm hoping for (String, Time, Int) but that appears optimistic. :) Do the elements have to be of homogeneous type? Holden Karau wrote > The union function simply returns a DStream with the elements from both. > This is the same behavior as when we call union on RDDs :) (You can think > of union as similar to the union operator on sets except without the > unique > element restrictions). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-does-DStream-union-do-tp17673p17682.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to extract/combine elements of an Array in DStream element?
I am processing a log file, from each line of which I want to extract the zeroth and 4th elements (and an integer 1 for counting) into a tuple. I had hoped to be able to index the Array for elements 0 and 4, but Arrays appear not to support vector indexing. I'm not finding a way to extract and combine the elements properly, perhaps due to being a SparkStreaming/Scala newbie. My code so far looks like: 1]var lines = ssc.textFileStream(dirArg) 2]var linesArray = lines.map( line => (line.split("\t"))) 3]var respH = linesArray.map( lineArray => lineArray(4) ) 4a] var time = linesArray.map( lineArray => lineArray(0) ) 4b] var time = linesArray.map( lineArray => (lineArray(0), 1)) 5]var newState = respH.union(time) If I use line 4a and not 4b, it compiles properly. (I still have issues getting my update function to updateStateByKey working, so don't know if it _works_ properly.) If I use line 4b and not 4a, it fails at compile time with [error] foo.scala:82: type mismatch; [error] found : org.apache.spark.streaming.dstream.DStream[(String, Int)] [error] required: org.apache.spark.streaming.dstream.DStream[String] [error] var newState = respH.union(time) This implies that the DStreams being union()ed have to be of identical per-element type. Can anyone confirm that's true? If so, is there a way to extract the needed elements and build the new DStream? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-extract-combine-elements-of-an-Array-in-DStream-element-tp17676.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
what does DStream.union() do?
The documentation at https://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.streaming.dstream.DStream describes the union() method as "Return a new DStream by unifying data of another DStream with this DStream." Can somebody provide a clear definition of what "unifying" means in this context? Does it append corresponding elements together? Inside a wider tuple if need be? I'm hoping for something clear enough that it could just be added to the doc page if the developers so chose. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-does-DStream-union-do-tp17673.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkStreaming program does not start
Thanks Abraham Jacob, Tobias Pfeiffer, Akhil Das-2, and Sean Owen for your helpful comments. Cockpit error on my part in just putting the .scala file as an argument rather than redirecting stdin from it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-program-does-not-start-tp15876p16402.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkStreaming program does not start
|| Try using spark-submit instead of spark-shell Two questions: - What does spark-submit do differently from spark-shell that makes you think that may be the cause of my difficulty? - When I try spark-submit it complains about "Error: Cannot load main class from JAR: file:/Users/spr/.../try1.scala". My program is not structured as a main class. Does it have to be to run with Spark Streaming? Or with spark-submit? Thanks much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-program-does-not-start-tp15876p15881.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkStreaming program does not start
I'm probably doing something obviously wrong, but I'm not seeing it. I have the program below (in a file try1.scala), which is similar but not identical to the examples. import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ println("Point 0") val appName = "try1.scala" val master = "local[5]" val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(10)) println("Point 1") val lines = ssc.textFileStream("/Users/spr/Documents/big_data/RSA2014/") println("Point 2") println("lines="+lines) println("Point 3") ssc.start() println("Point 4") ssc.awaitTermination() println("Point 5") I start the program via $S/bin/spark-shell --master local[5] try1.scala The messages I get are mbp-spr:cyber spr$ $S/bin/spark-shell --master local[5] try1.scala 14/10/07 17:36:58 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/10/07 17:36:58 INFO SecurityManager: Changing view acls to: spr 14/10/07 17:36:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spr) 14/10/07 17:36:58 INFO HttpServer: Starting HTTP Server Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.0.2 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_65) Type in expressions to have them evaluated. Type :help for more information. 14/10/07 17:37:01 INFO SecurityManager: Changing view acls to: spr 14/10/07 17:37:01 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spr) 14/10/07 17:37:01 INFO Slf4jLogger: Slf4jLogger started 14/10/07 17:37:01 INFO Remoting: Starting remoting 14/10/07 17:37:02 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@192.168.0.3:58351] 14/10/07 17:37:02 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@192.168.0.3:58351] 14/10/07 17:37:02 INFO SparkEnv: Registering MapOutputTracker 14/10/07 17:37:02 INFO SparkEnv: Registering BlockManagerMaster 14/10/07 17:37:02 INFO DiskBlockManager: Created local directory at /var/folders/pk/2bm2rq8n0rv499w5s9_c_w6r3b/T/spark-local-20141007173702-054c 14/10/07 17:37:02 INFO MemoryStore: MemoryStore started with capacity 303.4 MB. 14/10/07 17:37:02 INFO ConnectionManager: Bound socket to port 58352 with id = ConnectionManagerId(192.168.0.3,58352) 14/10/07 17:37:02 INFO BlockManagerMaster: Trying to register BlockManager 14/10/07 17:37:02 INFO BlockManagerInfo: Registering block manager 192.168.0.3:58352 with 303.4 MB RAM 14/10/07 17:37:02 INFO BlockManagerMaster: Registered BlockManager 14/10/07 17:37:02 INFO HttpServer: Starting HTTP Server 14/10/07 17:37:02 INFO HttpBroadcast: Broadcast server started at http://192.168.0.3:58353 14/10/07 17:37:02 INFO HttpFileServer: HTTP File server directory is /var/folders/pk/2bm2rq8n0rv499w5s9_c_w6r3b/T/spark-0950f667-aa04-4f6e-9d2e-5a9fab30806c 14/10/07 17:37:02 INFO HttpServer: Starting HTTP Server 14/10/07 17:37:02 INFO SparkUI: Started SparkUI at http://192.168.0.3:4040 2014-10-07 17:37:02.428 java[27725:1607] Unable to load realm mapping info from SCDynamicStore 14/10/07 17:37:02 INFO Executor: Using REPL class URI: http://192.168.0.3:58350 14/10/07 17:37:02 INFO SparkILoop: Created spark context.. Spark context available as sc. Note no messages from any of my "println()" statements. I could understand that I'm possibly screwing up something in the code, but why am I getting no print-out at all. ??? Suggestions? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-program-does-not-start-tp15876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to group within the messages at a vertex?
Sorry if this is in the docs someplace and I'm missing it. I'm trying to implement label propagation in GraphX. The core step of that algorithm is - for each vertex, find the most frequent label among its neighbors and set its label to that. (I think) I see how to get the input from all the neighbors, but I don't see how to group/reduce those to find the most frequent label. var G2 = G.mapVertices((id,attr) => id) val perSrcCount: VertexRDD[(Long, Long)] = G2.mapReduceTriplets[(Long, Long)]( edge => Iterator((edge.dstAttr, (edge.srcAttr, 1))), (a,b) => ((a._1), (a._2 + b._2)) // this line seems broken ) It seems on the "broken" line above, I don't want to reduce all the values to a scalar, as this code does, but rather group them first and then reduce them. Can I do that all within mapReduceTriples? If not, how do I build something that I can then further reduce? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-group-within-the-messages-at-a-vertex-tp14468.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: noob: how to extract different members of a VertexRDD
ankurdave wrote > val g = ... > val newG = g.mapVertices((id, attr) => id) > // newG.vertices has type VertexRDD[VertexId], or RDD[(VertexId, > VertexId)] Yes, that worked perfectly. Thanks much. One follow-up question. If I just wanted to get those values into a vanilla variable (not a VertexRDD or Graph or ...) so I could easily look at them in the REPL, what would I do? Are the aggregate data structures inside the VertexRDD/Graph/... Arrays or Lists or what, or do I even need to know/care? Thanks.Steve -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/noob-how-to-extract-different-members-of-a-VertexRDD-tp12399p12404.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
noob: how to extract different members of a VertexRDD
I'm a Scala / Spark / GraphX newbie, so may be missing something obvious. I have a set of edges that I read into a graph. For an iterative community-detection algorithm, I want to assign each vertex to a community with the name of the vertex. Intuitively it seems like I should be able to pull the vertexID out of the VertexRDD and build a new VertexRDD with 2 Int attributes. Unfortunately I'm not finding the recipe to unpack the VertexRDD into the vertexID and attribute pieces. The code snippet that builds the graph looks like import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD val G = GraphLoader.edgeListFile(sc,"[[...]]clique_5_2_3.edg") Poking at G to see what it looks like, I see scala> :type G.vertices org.apache.spark.graphx.VertexRDD[Int] scala> G.vertices.collect() res1: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((10002,1), (4,1), (10001,1), (1,1), (0,1), (1,1), (10003,1), (3,1), (10004,1), (2,1)) I've tried several ways to pull out just the first element of each tuple into a new variable, with no success. scala> var (x: Int) = G.vertices :21: error: type mismatch; found : org.apache.spark.graphx.VertexRDD[Int] required: Int var (x: Int) = G.vertices ^ scala> val x: Int = G.vertices._1 :21: error: value _1 is not a member of org.apache.spark.graphx.VertexRDD[Int] val x: Int = G.vertices._1 ^ What am I missing? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/noob-how-to-extract-different-members-of-a-VertexRDD-tp12399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org