Re: Discourse: A proposed alternative to the Spark User list

2015-01-23 Thread Nicholas Chammas
That sounds good to me. Shall I open a JIRA / PR about updating the site
community page?
On 2015년 1월 23일 (금) at 오전 4:37 Patrick Wendell patr...@databricks.com
wrote:

 Hey Nick,

 So I think we what can do is encourage people to participate on the
 stack overflow topic, and this I think we can do on the Spark website
 as a first class community resource for Spark. We should probably be
 spending more time on that site given its popularity.

 In terms of encouraging this explicitly *to replace* the ASF mailing
 list, that I think is harder to do. The ASF makes a lot of effort to
 host its own infrastructure that is neutral and not associated with
 any corporation. And by and large the ASF policy is to consider that
 as the de-facto forum of communication for any project.

 Personally, I wish the ASF would update this policy - for instance, by
 allowing the use of third party lists or communication fora - provided
 that they allow exporting the conversation if those sites were to
 change course. However, the state of the art stands as such.

 - Patrick

 On Wed, Jan 21, 2015 at 8:43 AM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  Josh / Patrick,
 
  What do y’all think of the idea of promoting Stack Overflow as a place to
  ask questions over this list, as long as the questions fit SO’s
 guidelines
  (how-to-ask, dont-ask)?
 
  The apache-spark tag is very active on there.
 
  Discussions of all types are still on-topic here, but when possible we
 want
  to encourage people to use SO.
 
  Nick
 
  On Wed Jan 21 2015 at 8:37:05 AM Jay Vyas jayunit100.apa...@gmail.com
 wrote:
 
  Its a very valid  idea indeed, but... It's a tricky  subject since the
  entire ASF is run on mailing lists , hence there are so many different
 but
  equally sound ways of looking at this idea, which conflict with one
 another.
 
   On Jan 21, 2015, at 7:03 AM, btiernay btier...@hotmail.com wrote:
  
   I think this is a really great idea for really opening up the
   discussions
   that happen here. Also, it would be nice to know why there doesn't
 seem
   to
   be much interest. Maybe I'm misunderstanding some nuance of Apache
   projects.
  
   Cheers
  
  
  
   --
   View this message in context:
   http://apache-spark-user-list.1001560.n3.nabble.com/
 Discourse-A-proposed-alternative-to-the-Spark-User-list-tp20851p21288.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 



Installing Spark Standalone to a Cluster

2015-01-23 Thread riginos
Do i need to manually install and configure hadoop before doing anything with
spark standalone?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-Standalone-to-a-Cluster-tp21339.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Exception: NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions

2015-01-23 Thread Yana Kadiyska
if you're running the test via sbt you can examine the classpath that sbt
uses for the test (show runtime:full-classpath or last run)-- I find this
helps once too many includes and excludes interact.

On Thu, Jan 22, 2015 at 3:50 PM, Adrian Mocanu amoc...@verticalscope.com
wrote:


 I use spark 1.1.0-SNAPSHOT and the test I'm running is in local mode. My
 test case uses org.apache.spark.streaming.TestSuiteBase

 val spark=org.apache.spark %% spark-core % 1.1.0-SNAPSHOT %
 provided excludeAll(
 val sparkStreaming= org.apache.spark % spark-streaming_2.10 %
 1.1.0-SNAPSHOT % provided excludeAll(
 val sparkCassandra= com.tuplejump % calliope_2.10 % 0.9.0-C2-EA
 exclude(org.apache.cassandra, cassandra-all)
 exclude(org.apache.cassandra, cassandra-thrift)
 val casAll = org.apache.cassandra % cassandra-all % 2.0.3
 intransitive()
 val casThrift = org.apache.cassandra % cassandra-thrift % 2.0.3
 intransitive()
 val sparkStreamingFromKafka = org.apache.spark %
 spark-streaming-kafka_2.10 % 0.9.1 excludeAll(


 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: January-22-15 11:39 AM
 To: Adrian Mocanu
 Cc: u...@spark.incubator.apache.org
 Subject: Re: Exception: NoSuchMethodError:
 org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions

 NoSuchMethodError almost always means that you have compiled some code
 against one version of a library but are running against another. I wonder
 if you are including different versions of Spark in your project, or
 running against a cluster on an older version?

 On Thu, Jan 22, 2015 at 3:57 PM, Adrian Mocanu amoc...@verticalscope.com
 wrote:
  Hi
 
  I get this exception when I run a Spark test case on my local machine:
 
 
 
  An exception or error caused a run to abort:
  org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo
  rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca
  la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/
  dstream/PairDStreamFunctions;
 
  java.lang.NoSuchMethodError:
  org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo
  rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca
  la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/
  dstream/PairDStreamFunctions;
 
 
 
  In my test case I have these Spark related imports imports:
 
  import org.apache.spark.streaming.StreamingContext._
 
  import org.apache.spark.streaming.TestSuiteBase
 
  import org.apache.spark.streaming.dstream.DStream
 
  import
  org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
 
 
 
  -Adrian
 
 
  B CB
   [  X  ܚX K  K[XZ[
   \ \ ][  X  ܚX P   \ ˘\ X  K ܙ B  ܈ Y  ] [ۘ[[X[ K[XZ[
   \ \ Z [ \ ˘\ X  K ܙ B B

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Discourse: A proposed alternative to the Spark User list

2015-01-23 Thread Gerard Maas
+1

On Fri, Jan 23, 2015 at 5:58 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 That sounds good to me. Shall I open a JIRA / PR about updating the site
 community page?
 On 2015년 1월 23일 (금) at 오전 4:37 Patrick Wendell patr...@databricks.com
 wrote:

 Hey Nick,

 So I think we what can do is encourage people to participate on the
 stack overflow topic, and this I think we can do on the Spark website
 as a first class community resource for Spark. We should probably be
 spending more time on that site given its popularity.

 In terms of encouraging this explicitly *to replace* the ASF mailing
 list, that I think is harder to do. The ASF makes a lot of effort to
 host its own infrastructure that is neutral and not associated with
 any corporation. And by and large the ASF policy is to consider that
 as the de-facto forum of communication for any project.

 Personally, I wish the ASF would update this policy - for instance, by
 allowing the use of third party lists or communication fora - provided
 that they allow exporting the conversation if those sites were to
 change course. However, the state of the art stands as such.

 - Patrick


 On Wed, Jan 21, 2015 at 8:43 AM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  Josh / Patrick,
 
  What do y’all think of the idea of promoting Stack Overflow as a place
 to
  ask questions over this list, as long as the questions fit SO’s
 guidelines
  (how-to-ask, dont-ask)?
 
  The apache-spark tag is very active on there.
 
  Discussions of all types are still on-topic here, but when possible we
 want
  to encourage people to use SO.
 
  Nick
 
  On Wed Jan 21 2015 at 8:37:05 AM Jay Vyas jayunit100.apa...@gmail.com
 wrote:
 
  Its a very valid  idea indeed, but... It's a tricky  subject since the
  entire ASF is run on mailing lists , hence there are so many different
 but
  equally sound ways of looking at this idea, which conflict with one
 another.
 
   On Jan 21, 2015, at 7:03 AM, btiernay btier...@hotmail.com wrote:
  
   I think this is a really great idea for really opening up the
   discussions
   that happen here. Also, it would be nice to know why there doesn't
 seem
   to
   be much interest. Maybe I'm misunderstanding some nuance of Apache
   projects.
  
   Cheers
  
  
  
   --
   View this message in context:
   http://apache-spark-user-list.1001560.n3.nabble.com/
 Discourse-A-proposed-alternative-to-the-Spark-User-
 list-tp20851p21288.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   
 -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 




Installing Spark Standalone to a Cluster

2015-01-23 Thread riginos
I need someone to send me a snapshot of his /conf/spark-env.sh file cause i
don't understand how to set some vars like SPARK_MASTER etc



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-Standalone-to-a-Cluster-tp21341.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Large number of pyspark.daemon processes

2015-01-23 Thread Sven Krasser
Hey all,

I am running into a problem where YARN kills containers for being over
their memory allocation (which is about 8G for executors plus 6G for
overhead), and I noticed that in those containers there are tons of
pyspark.daemon processes hogging memory. Here's a snippet from a container
with 97 pyspark.daemon processes. The total sum of RSS usage across all of
these is 1,764,956 pages (i.e. 6.7GB on the system).

Any ideas what's happening here and how I can get the number of
pyspark.daemon processes back to a more reasonable count?

2015-01-23 15:36:53,654 INFO  [Reporter] yarn.YarnAllocationHandler
(Logging.scala:logInfo(59)) - Container marked as failed:
container_1421692415636_0052_01_30. Exit status: 143. Diagnostics:
Container [pid=35211,containerID=container_1421692415636_0052_01_30]
is running beyond physical memory limits. Current usage: 14.9 GB of
14.5 GB physical memory used; 41.3 GB of 72.5 GB virtual memory used.
Killing container.
Dump of the process-tree for container_1421692415636_0052_01_30 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
FULL_CMD_LINE
|- 54101 36625 36625 35211 (python) 78 1 332730368 16834 python -m
pyspark.daemon
|- 52140 36625 36625 35211 (python) 58 1 332730368 16837 python -m
pyspark.daemon
|- 36625 35228 36625 35211 (python) 65 604 331685888 17694 python -m
pyspark.daemon

[...]


Full output here: https://gist.github.com/skrasser/e3e2ee8dede5ef6b082c

Thank you!
-Sven

-- 
http://sites.google.com/site/krasser/?utm_source=sig


Re: Results never return to driver | Spark Custom Reader

2015-01-23 Thread Yana Kadiyska
It looks to me like your executor actually crashed and didn't just finish
properly.

Can you check the executor log?

It is available in the UI, or on the worker machine, under $SPARK_HOME/work/
 app-20150123155114-/6/stderr  (unless you manually changed the work
directory location but in that case I'd assume you know where to find the
log)

On Thu, Jan 22, 2015 at 10:54 PM, Harihar Nahak hna...@wynyardgroup.com
wrote:

 Hi All,

 I wrote a custom reader to read a DB, and it is able to return key and
 value
 as expected but after it finished it never returned to driver

 here is output of worker log :
 15/01/23 15:51:38 INFO worker.ExecutorRunner: Launch command: java -cp

 ::/usr/local/spark-1.2.0-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/spark-assembly-1.2.0-hadoop2.4.0.jar:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/hadoop/etc/hadoop
 -XX:MaxPermSize=128m -Dspark.driver.port=53484 -Xms1024M -Xmx1024M
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@VM90:53484/user/CoarseGrainedScheduler 6 VM99
 4 app-20150123155114-
 akka.tcp://sparkWorker@VM99:44826/user/Worker
 15/01/23 15:51:47 INFO worker.Worker: Executor app-20150123155114-/6
 finished with state EXITED message Command exited with code 1 exitStatus 1
 15/01/23 15:51:47 WARN remote.ReliableDeliverySupervisor: Association with
 remote system [akka.tcp://sparkExecutor@VM99:57695] has failed, address is
 now gated for [5000] ms. Reason is: [Disassociated].
 15/01/23 15:51:47 INFO actor.LocalActorRef: Message
 [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
 Actor[akka://sparkWorker/deadLetters] to

 Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%40143.96.25.29%3A35065-4#-915179653]
 was not delivered. [3] dead letters encountered. This logging can be turned
 off or adjusted with configuration settings 'akka.log-dead-letters' and
 'akka.log-dead-letters-during-shutdown'.
 15/01/23 15:51:49 INFO worker.Worker: Asked to kill unknown executor
 app-20150123155114-/6

 If someone noticed any clue to fixed that will really appreciate.



 -
 --Harihar
 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Results-never-return-to-driver-Spark-Custom-Reader-tp21328.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Installing Spark Standalone to a Cluster

2015-01-23 Thread HARIPRIYA AYYALASOMAYAJULA
not needed. You can directly follow the installation
However you might need sbt to package your files to jar.

On Fri, Jan 23, 2015 at 11:54 AM, riginos samarasrigi...@gmail.com wrote:

 Do i need to manually install and configure hadoop before doing anything
 with
 spark standalone?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-Standalone-to-a-Cluster-tp21339.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Regards,
Haripriya Ayyalasomayajula
Graduate Student
Department of Computer Science
University of Houston
Contact : 650-796-7112


Re: How to make spark partition sticky, i.e. stay with node?

2015-01-23 Thread Tathagata Das
Hello mingyu,

That is a reasonable way of doing this. Spark Streaming natively does
not support sticky because Spark launches tasks based on data
locality. If there is no locality (example reduce tasks can run
anywhere), location is randomly assigned. So the cogroup or join
introduces a locality and which forces Spark scheduler to be sticky.
Another way to achieve this is using updateStateByKey which
internally uses cogroup, but presents a nicer streaming-like API for
per-key stateful operations.

TD

On Fri, Jan 23, 2015 at 8:23 AM, mingyu mingyut...@gmail.com wrote:
 I found a workaround.
 I can make my auxiliary data a RDD. Partition it and cache it.
 Later, I can cogroup it with other RDDs and Spark will try to keep the
 cached RDD partitions where they are and not shuffle them.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-spark-partition-sticky-i-e-stay-with-node-tp21322p21338.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark 1.1.0 save data to hdfs failed

2015-01-23 Thread ey-chih chow
Thanks.  I looked at the dependency tree.  I did not see any dependent jar of 
hadoop-core from hadoop2.  However the jar built from maven has the class:
 org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.class
Do you know why?



Date: Fri, 23 Jan 2015 17:01:48 +
Subject: RE: spark 1.1.0 save data to hdfs failed
From: so...@cloudera.com
To: eyc...@hotmail.com

Are you receiving my replies? I have suggested a resolution. Look at the 
dependency tree next. 
On Jan 23, 2015 2:43 PM, ey-chih chow eyc...@hotmail.com wrote:



I looked into the source code of SparkHadoopMapReduceUtil.scala. I think it is 
broken in the following code:
  def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): 
TaskAttemptContext = {val klass = firstAvailableClass(
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl,  // hadoop2, 
hadoop2-yarnorg.apache.hadoop.mapreduce.TaskAttemptContext)   
// hadoop1val ctor = klass.getDeclaredConstructor(classOf[Configuration], 
classOf[TaskAttemptID])ctor.newInstance(conf, 
attemptId).asInstanceOf[TaskAttemptContext]  }
In other words, it is related to hadoop2, hadoop2-yarn, and hadoop1.  Any 
suggestion how to resolve it?
Thanks.


 From: so...@cloudera.com
 Date: Fri, 23 Jan 2015 14:01:45 +
 Subject: Re: spark 1.1.0 save data to hdfs failed
 To: eyc...@hotmail.com
 CC: user@spark.apache.org
 
 These are all definitely symptoms of mixing incompatible versions of 
 libraries.
 
 I'm not suggesting you haven't excluded Spark / Hadoop, but, this is
 not the only way Hadoop deps get into your app. See my suggestion
 about investigating the dependency tree.
 
 On Fri, Jan 23, 2015 at 1:53 PM, ey-chih chow eyc...@hotmail.com wrote:
  Thanks.  But I think I already mark all the Spark and Hadoop reps as
  provided.  Why the cluster's version is not used?
 
  Any way, as I mentioned in the previous message, after changing the
  hadoop-client to version 1.2.1 in my maven deps, I already pass the
  exception and go to another one as indicated below.  Any suggestion on this?
 
  =
 
  Exception in thread main java.lang.reflect.InvocationTargetException
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at
  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at
  org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
  at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
  Caused by: java.lang.IncompatibleClassChangeError: Implementing class
  at java.lang.ClassLoader.defineClass1(Native Method)
  at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
  at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
  at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:191)
  at
  org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73)
  at
  org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newTaskAttemptContext(SparkHadoopMapReduceUtil.scala:35)
  at
  org.apache.spark.rdd.PairRDDFunctions.newTaskAttemptContext(PairRDDFunctions.scala:53)
  at
  org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:932)
  at
  org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832)
  at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:103)
  at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala)
 
  ... 6 more
 
  
  

spark 1.2 - Writing parque fails for timestamp with Unsupported datatype TimestampType

2015-01-23 Thread Manoj Samel
Using Spark 1.2

Read a CSV file, apply schema to convert to SchemaRDD and then
schemaRdd.saveAsParquetFile

If the schema includes Timestamptype, it gives following trace when doing
the save

Exception in thread main java.lang.RuntimeException: Unsupported datatype
TimestampType

at scala.sys.package$.error(package.scala:27)

at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(
ParquetTypes.scala:343)

at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(
ParquetTypes.scala:292)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(
ParquetTypes.scala:291)

at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(
ParquetTypes.scala:363)

at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(
ParquetTypes.scala:362)

at scala.collection.TraversableLike$$anonfun$map$1.apply(
TraversableLike.scala:244)

at scala.collection.TraversableLike$$anonfun$map$1.apply(
TraversableLike.scala:244)

at scala.collection.mutable.ResizableArray$class.foreach(
ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.AbstractTraversable.map(Traversable.scala:105)

at
org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(
ParquetTypes.scala:361)

at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(
ParquetTypes.scala:407)

at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(
ParquetRelation.scala:166)

at org.apache.spark.sql.parquet.ParquetRelation$.create(
ParquetRelation.scala:145)

at org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(
SparkStrategies.scala:204)

at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(
QueryPlanner.scala:58)

at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(
QueryPlanner.scala:58)

at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(
QueryPlanner.scala:59)

at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(
SQLContext.scala:418)

at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(
SQLContext.scala:416)

at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(
SQLContext.scala:422)

at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(
SQLContext.scala:422)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(
SQLContext.scala:425)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425
)

at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(
SchemaRDDLike.scala:76)

at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:108)

at bdrt.MyTest$.createParquetWithDate(MyTest.scala:88)

at bdrt.MyTest$delayedInit$body.apply(MyTest.scala:54)

at scala.Function0$class.apply$mcV$sp(Function0.scala:40)

at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)

at scala.App$$anonfun$main$1.apply(App.scala:71)

at scala.App$$anonfun$main$1.apply(App.scala:71)

at scala.collection.immutable.List.foreach(List.scala:318)

at scala.collection.generic.TraversableForwarder$class.foreach(
TraversableForwarder.scala:32)

at scala.App$class.main(App.scala:71)

at bdrt.MyTest$.main(MyTest.scala:10)


Starting a spark streaming app in init.d

2015-01-23 Thread Ashic Mahtab
Hello,
I'm trying to kick off a spark streaming job to a stand alone master using 
spark submit inside of init.d. This is what I have:


DAEMON=spark-submit --class Streamer --executor-memory 500M 
--total-executor-cores 4 /path/to/assembly.jar

start() {
$DAEMON -p /var/run/my_assembly.pid 
echo OK 
return 0
}

However, will return 0 even if spark_submit fails. Is there a way to run 
spark-submit in the background and return 0 only if it successfully starts up? 
Or better yet, is there something in spark-submit that will allow me to do 
this, perhaps via a command line argument?

Thanks,
Ashic.
  

Spark Streaming action not triggered with Kafka inputs

2015-01-23 Thread Chen Song
I am running into some problems with Spark Streaming when reading from
Kafka.I used Spark 1.2.0 built on CDH5.
The example is based on:
https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
* It works with default implementation.
val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)

* However, when I changed it to parallel receiving, like shown below

val topicMap = topics.split(,).map((_, 1)).toMap
val parallelInputs = (1 to numThreads.toInt) map { _ = KafkaUtils.
createStream(ssc, zkQuorum, group, topicMap)

}

ssc.union(parallelInputs)
After the change, the job stage just hang there and never finish. It looks
like no action is triggered on the streaming job. When I check the
Streaming tab, it show messages below:
Batch Processing Statistics

   No statistics have been generated yet.


Am I doing anything wrong on the parallel receiving part?

-- 
Chen Song


Problems saving a large RDD (1 TB) to S3 as a sequence file

2015-01-23 Thread Darin McBeath
I've tried various ideas, but I'm really just shooting in the dark.

I have an 8 node cluster of r3.8xlarge machines. The RDD (with 1024 partitions) 
I'm trying to save off to S3 is approximately 1TB in size (with the partitions 
pretty evenly distributed in size).

I just tried a test to dial back the number of executors on my cluster from 
using the entire cluster (256 cores) down to 128.  Things seemed to get a bit 
farther (maybe) before the wheels started spinning off again.  But, the job 
always fails when all I'm trying to do is save the 1TB file to S3.

I see the following in my master log file.

15/01/23 19:01:54 WARN master.Master: Removing worker-20150123172316 because we 
got no heartbeat in 60 seconds
15/01/23 19:01:54 INFO master.Master: Removing worker worker-20150123172316 on 
15/01/23 19:01:54 INFO master.Master: Telling app of lost executor: 3

For the stage that eventually fails, I see the following summary information.

Summary Metrics for 729 Completed Tasks
Duration 2.5 min 4.8 min 5.5 min 6.3 min 9.2 min 
GC Time   0 ms 0.3 s 0.4 s 0.5 s 5 s 

Shuffle Read (Remote) 309.3 MB 321.7 MB 325.4 MB 329.6 MB 350.6 MB 

So, the max GC was only 5s for 729 completed tasks.  This sounds reasonable.  
As people tend to indicate GC is the reason one loses executors, this does not 
appear to be my case.

Here is a typical snapshot for some completed tasks.  So, you can see that they 
tend to complete in approximately 6 minutes.  So, it takes about 6 minutes to 
write one partition to S3 (a partition being roughly 1 GB)

65  23619   0   SUCCESS ANY 5 /  2015/01/23 18:30:32
5.8 min 0.9 s   344.6 MB 
59  23613   0   SUCCESS ANY 7 /  2015/01/23 18:30:32
6.0 min 0.4 s   324.1 MB 
68  23622   0   SUCCESS ANY 1 /  2015/01/23 18:30:32
5.7 min 0.5 s   329.9 MB 
62  23616   0   SUCCESS ANY 6 /  2015/01/23 18:30:32
5.8 min 0.7 s   326.4 MB 
61  23615   0   SUCCESS ANY 3 /  2015/01/23 18:30:32
5.5 min 1 s 335.7 MB 
64  23618   0   SUCCESS ANY 2 /  2015/01/23 18:30:32
5.6 min 2 s 328.1 MB 

Then towards the end, when things start heading south, I see the following.  
These tasks never complete but you can see that they have taken more than 47 
minutes (so far) before the job finally fails.  Not really sure why.

671 24225   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
672 24226   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
673 24227   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
674 24228   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
675 24229   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
676 24230   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
677 24231   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
678 24232   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
679 24233   0   RUNNING ANY 1 /  2015/01/23 18:59:14
47 min 
680 24234   0   RUNNING ANY 1 /  2015/01/23 18:59:17
47 min 
681 24235   0   RUNNING ANY 1 /  2015/01/23 18:59:18
47 min 
682 24236   0   RUNNING ANY 1 /  2015/01/23 18:59:18
47 min 
683 24237   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
684 24238   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
685 24239   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
686 24240   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
687 24241   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
688 24242   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
689 24243   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
690 24244   0   RUNNING ANY 5 /  2015/01/23 18:59:20
47 min 
691 24245   0   RUNNING ANY 5 /  2015/01/23 18:59:21
47 min 

What's odd is that even on the same machine (see below) some tasks are still 
completing (in less than 5 minutes) while other tasks on the same machine seem 
to be hung after 46 minutes.  Keep in mind all I'm doing is saving the file to 
S3 so one would think the amount of work per task/partition would be fairly 
equal.

694 24248   0   SUCCESS ANY 0 /  2015/01/23 18:59:32
4.5 min 0.3 s   326.5 MB 
695 24249   0   SUCCESS ANY 0 /  2015/01/23 18:59:32
4.5 min 0.3 s   330.8 MB 
696 24250   0   RUNNING ANY 0 /  2015/01/23 18:59:32
46 min 
697 24251   

Re: Discourse: A proposed alternative to the Spark User list

2015-01-23 Thread Nicholas Chammas
https://issues.apache.org/jira/browse/SPARK-5390

On Fri Jan 23 2015 at 12:05:00 PM Gerard Maas gerard.m...@gmail.com wrote:

 +1

 On Fri, Jan 23, 2015 at 5:58 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 That sounds good to me. Shall I open a JIRA / PR about updating the site
 community page?
 On 2015년 1월 23일 (금) at 오전 4:37 Patrick Wendell patr...@databricks.com
 wrote:

 Hey Nick,

 So I think we what can do is encourage people to participate on the
 stack overflow topic, and this I think we can do on the Spark website
 as a first class community resource for Spark. We should probably be
 spending more time on that site given its popularity.

 In terms of encouraging this explicitly *to replace* the ASF mailing
 list, that I think is harder to do. The ASF makes a lot of effort to
 host its own infrastructure that is neutral and not associated with
 any corporation. And by and large the ASF policy is to consider that
 as the de-facto forum of communication for any project.

 Personally, I wish the ASF would update this policy - for instance, by
 allowing the use of third party lists or communication fora - provided
 that they allow exporting the conversation if those sites were to
 change course. However, the state of the art stands as such.

 - Patrick


 On Wed, Jan 21, 2015 at 8:43 AM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  Josh / Patrick,
 
  What do y’all think of the idea of promoting Stack Overflow as a place
 to
  ask questions over this list, as long as the questions fit SO’s
 guidelines
  (how-to-ask, dont-ask)?
 
  The apache-spark tag is very active on there.
 
  Discussions of all types are still on-topic here, but when possible we
 want
  to encourage people to use SO.
 
  Nick
 
  On Wed Jan 21 2015 at 8:37:05 AM Jay Vyas jayunit100.apa...@gmail.com
 wrote:
 
  Its a very valid  idea indeed, but... It's a tricky  subject since the
  entire ASF is run on mailing lists , hence there are so many
 different but
  equally sound ways of looking at this idea, which conflict with one
 another.
 
   On Jan 21, 2015, at 7:03 AM, btiernay btier...@hotmail.com wrote:
  
   I think this is a really great idea for really opening up the
   discussions
   that happen here. Also, it would be nice to know why there doesn't
 seem
   to
   be much interest. Maybe I'm misunderstanding some nuance of Apache
   projects.
  
   Cheers
  
  
  
   --
   View this message in context:
   http://apache-spark-user-list.1001560.n3.nabble.com/
 Discourse-A-proposed-alternative-to-the-Spark-User-
 list-tp20851p21288.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   
 -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 





Re: Problems saving a large RDD (1 TB) to S3 as a sequence file

2015-01-23 Thread Sven Krasser
Hey Darin,

Are you running this over EMR or as a standalone cluster? I've had
occasional success in similar cases by digging through all executor logs
and trying to find exceptions that are not caused by the application
shutdown (but the logs remain my main pain point with Spark).

That aside, another explanation could be S3 throttling you due to volume
(and hence causing write requests to fail). You can try to split your file
into multiple pieces and store those as S3 objects with different prefixes
to make sure they end up in different partitions in S3. See here for
details:
http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html.
If that works, that'll narrow the cause down.

Best,
-Sven


On Fri, Jan 23, 2015 at 12:04 PM, Darin McBeath ddmcbe...@yahoo.com.invalid
 wrote:

 I've tried various ideas, but I'm really just shooting in the dark.

 I have an 8 node cluster of r3.8xlarge machines. The RDD (with 1024
 partitions) I'm trying to save off to S3 is approximately 1TB in size (with
 the partitions pretty evenly distributed in size).

 I just tried a test to dial back the number of executors on my cluster
 from using the entire cluster (256 cores) down to 128.  Things seemed to
 get a bit farther (maybe) before the wheels started spinning off again.
 But, the job always fails when all I'm trying to do is save the 1TB file to
 S3.

 I see the following in my master log file.

 15/01/23 19:01:54 WARN master.Master: Removing worker-20150123172316
 because we got no heartbeat in 60 seconds
 15/01/23 19:01:54 INFO master.Master: Removing worker
 worker-20150123172316 on
 15/01/23 19:01:54 INFO master.Master: Telling app of lost executor: 3

 For the stage that eventually fails, I see the following summary
 information.

 Summary Metrics for 729 Completed Tasks
 Duration 2.5 min 4.8 min 5.5 min 6.3 min 9.2 min
 GC Time   0 ms 0.3 s 0.4 s 0.5 s 5 s

 Shuffle Read (Remote) 309.3 MB 321.7 MB 325.4 MB 329.6 MB 350.6 MB

 So, the max GC was only 5s for 729 completed tasks.  This sounds
 reasonable.  As people tend to indicate GC is the reason one loses
 executors, this does not appear to be my case.

 Here is a typical snapshot for some completed tasks.  So, you can see that
 they tend to complete in approximately 6 minutes.  So, it takes about 6
 minutes to write one partition to S3 (a partition being roughly 1 GB)

 65  23619   0   SUCCESS ANY 5 /  2015/01/23 18:30:32
   5.8 min 0.9 s   344.6 MB
 59  23613   0   SUCCESS ANY 7 /  2015/01/23 18:30:32
   6.0 min 0.4 s   324.1 MB
 68  23622   0   SUCCESS ANY 1 /  2015/01/23 18:30:32
   5.7 min 0.5 s   329.9 MB
 62  23616   0   SUCCESS ANY 6 /  2015/01/23 18:30:32
   5.8 min 0.7 s   326.4 MB
 61  23615   0   SUCCESS ANY 3 /  2015/01/23 18:30:32
   5.5 min 1 s 335.7 MB
 64  23618   0   SUCCESS ANY 2 /  2015/01/23 18:30:32
   5.6 min 2 s 328.1 MB

 Then towards the end, when things start heading south, I see the
 following.  These tasks never complete but you can see that they have taken
 more than 47 minutes (so far) before the job finally fails.  Not really
 sure why.

 671 24225   0   RUNNING ANY 1 /  2015/01/23 18:59:14
   47 min
 672 24226   0   RUNNING ANY 1 /  2015/01/23 18:59:14
   47 min
 673 24227   0   RUNNING ANY 1 /  2015/01/23 18:59:14
   47 min
 674 24228   0   RUNNING ANY 1 /  2015/01/23 18:59:14
   47 min
 675 24229   0   RUNNING ANY 1 /  2015/01/23 18:59:14
   47 min
 676 24230   0   RUNNING ANY 1 /  2015/01/23 18:59:14
   47 min
 677 24231   0   RUNNING ANY 1 /  2015/01/23 18:59:14
   47 min
 678 24232   0   RUNNING ANY 1 /  2015/01/23 18:59:14
   47 min
 679 24233   0   RUNNING ANY 1 /  2015/01/23 18:59:14
   47 min
 680 24234   0   RUNNING ANY 1 /  2015/01/23 18:59:17
   47 min
 681 24235   0   RUNNING ANY 1 /  2015/01/23 18:59:18
   47 min
 682 24236   0   RUNNING ANY 1 /  2015/01/23 18:59:18
   47 min
 683 24237   0   RUNNING ANY 5 /  2015/01/23 18:59:20
   47 min
 684 24238   0   RUNNING ANY 5 /  2015/01/23 18:59:20
   47 min
 685 24239   0   RUNNING ANY 5 /  2015/01/23 18:59:20
   47 min
 686 24240   0   RUNNING ANY 5 /  2015/01/23 18:59:20
   47 min
 687 24241   0   RUNNING ANY 5 /  2015/01/23 18:59:20
   47 min
 688 24242   0   RUNNING ANY 5 /  2015/01/23 18:59:20
   47 min
 689 24243   0   RUNNING ANY 5 /  2015/01/23 18:59:20
   47 min
 690 24244   0   

Apache Spark standalone mode: number of cores

2015-01-23 Thread olegshirokikh
I'm trying to understand the basics of Spark internals and Spark
documentation for submitting applications in local mode says for
spark-submit --master setting:

local[K] Run Spark locally with K worker threads (ideally, set this to the
number of cores on your machine).

local[*] Run Spark locally with as many worker threads as logical cores on
your machine.
Since all the data is stored on a single local machine, it does not benefit
from distributed operations on RDDs.

How does it benefit and what internally is going on when Spark utilizes
several logical cores?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-standalone-mode-number-of-cores-tp21342.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-shell has syntax error on windows.

2015-01-23 Thread Josh Rosen
Do you mind filing a JIRA issue for this which includes the actual error
message string that you saw?  https://issues.apache.org/jira/browse/SPARK

On Thu, Jan 22, 2015 at 8:31 AM, Yana Kadiyska yana.kadiy...@gmail.com
wrote:

 I am not sure if you get the same exception as I do -- spark-shell2.cmd
 works fine for me. Windows 7 as well. I've never bothered looking to fix it
 as it seems spark-shell just calls spark-shell2 anyway...

 On Thu, Jan 22, 2015 at 3:16 AM, Vladimir Protsenko protsenk...@gmail.com
  wrote:

 I have a problem with running spark shell in windows 7. I made the
 following
 steps:

 1. downloaded and installed Scala 2.11.5
 2. downloaded spark 1.2.0 by git clone git://github.com/apache/spark.git
 3. run dev/change-version-to-2.11.sh and mvn -Dscala-2.11 -DskipTests
 clean
 package (in git bash)

 After installation tried to run spark-shell.cmd in cmd shell and it says
 there is a syntax error in file. What could I do to fix problem?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-has-syntax-error-on-windows-tp21313.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Apache Spark standalone mode: number of cores

2015-01-23 Thread Boromir Widas
The local mode still parallelizes calculations and it is useful for
debugging as it goes through the steps of serialization/deserialization as
a cluster would.

On Fri, Jan 23, 2015 at 5:44 PM, olegshirokikh o...@solver.com wrote:

 I'm trying to understand the basics of Spark internals and Spark
 documentation for submitting applications in local mode says for
 spark-submit --master setting:

 local[K] Run Spark locally with K worker threads (ideally, set this to the
 number of cores on your machine).

 local[*] Run Spark locally with as many worker threads as logical cores on
 your machine.
 Since all the data is stored on a single local machine, it does not benefit
 from distributed operations on RDDs.

 How does it benefit and what internally is going on when Spark utilizes
 several logical cores?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-standalone-mode-number-of-cores-tp21342.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RE: How to 'Pipe' Binary Data in Apache Spark

2015-01-23 Thread Venkat, Ankam
Spark Committers:  Please advise the way forward for this issue.

Thanks for your support.

Regards,
Venkat

From: Venkat, Ankam
Sent: Thursday, January 22, 2015 9:34 AM
To: 'Frank Austin Nothaft'; 'user@spark.apache.org'
Cc: 'Nick Allen'
Subject: RE: How to 'Pipe' Binary Data in Apache Spark

How much time it takes to port it?

Spark committers:  Please let us know your thoughts.

Regards,
Venkat

From: Frank Austin Nothaft [mailto:fnoth...@berkeley.edu]
Sent: Thursday, January 22, 2015 9:08 AM
To: Venkat, Ankam
Cc: Nick Allen; user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: How to 'Pipe' Binary Data in Apache Spark

Venkat,

No problem!

So, creating a custom InputFormat or using sc.binaryFiles alone is not the 
right solution.   We also need the modified version of RDD.pipe to support 
binary data?  Is my understanding correct?

Yep! That is correct. The custom InputFormat allows Spark to load binary 
formatted data from disk/HDFS/S3/etc..., but then the default RDD.pipe 
reads/writes text to a pipe, so you'd need the custom mapPartitions call.



If yes, this can be added as new enhancement Jira request?

The code that I have right now is fairly custom to my application, but if there 
was interest, I would be glad to port it for the Spark core.

Regards,

Frank Austin Nothaft
fnoth...@berkeley.edumailto:fnoth...@berkeley.edu
fnoth...@eecs.berkeley.edumailto:fnoth...@eecs.berkeley.edu
202-340-0466

On Jan 22, 2015, at 7:11 AM, Venkat, Ankam 
ankam.ven...@centurylink.commailto:ankam.ven...@centurylink.com wrote:



Thanks Frank for your response.

So, creating a custom InputFormat or using sc.binaryFiles alone is not the 
right solution.   We also need the modified version of RDD.pipe to support 
binary data?  Is my understanding correct?

If yes, this can be added as new enhancement Jira request?

Nick:  What's your take on this?

Regards,
Venkat Ankam


From: Frank Austin Nothaft [mailto:fnoth...@berkeley.edu]
Sent: Wednesday, January 21, 2015 12:30 PM
To: Venkat, Ankam
Cc: Nick Allen; user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: How to 'Pipe' Binary Data in Apache Spark

Hi Venkat/Nick,

The Spark RDD.pipe method pipes text data into a subprocess and then receives 
text data back from that process. Once you have the binary data loaded into an 
RDD properly, to pipe binary data to/from a subprocess (e.g., you want the data 
in the pipes to contain binary, not text), you need to implement your own, 
modified version of RDD.pipe. The 
implementationhttps://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
 of RDD.pipe spawns a process per partition (IIRC), as well as threads for 
writing to and reading from the process (as well as stderr for the process). 
When writing via RDD.pipe, Spark calls *.toString on the object, and pushes 
that text representation down the pipe. There is an example of how to pipe 
binary data from within a mapPartitions call using the Scala API in lines 
107-177 of this 
filehttps://github.com/bigdatagenomics/avocado/blob/master/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/ExternalGenotyper.scala.
 This specific code contains some nastiness around the packaging of downstream 
libraries that we rely on in that project, so I'm not sure if it is the 
cleanest way, but it is a workable way.

Regards,

Frank Austin Nothaft
fnoth...@berkeley.edumailto:fnoth...@berkeley.edu
fnoth...@eecs.berkeley.edumailto:fnoth...@eecs.berkeley.edu
202-340-0466

On Jan 21, 2015, at 9:17 AM, Venkat, Ankam 
ankam.ven...@centurylink.commailto:ankam.ven...@centurylink.com wrote:




I am trying to solve similar problem.  I am using option # 2 as suggested by 
Nick.

I have created an RDD with sc.binaryFiles for a list of .wav files.  But, I am 
not able to pipe it to the external programs.

For example:
 sq = sc.binaryFiles(wavfiles)  -- All .wav files stored on wavfiles 
 directory on HDFS
 sq.keys().collect() -- works fine.  Shows the list of file names.
 sq.values().collect() -- works fine.  Shows the content of the files.
 sq.values().pipe(lambda x: subprocess.call(['/usr/local/bin/sox', '-t' 
 'wav', '-', '-n', 'stats'])).collect() -- Does not work.  Tried different 
 options.
AttributeError: 'function' object has no attribute 'read'

Any suggestions?

Regards,
Venkat Ankam

From: Nick Allen [mailto:n...@nickallen.org]
Sent: Friday, January 16, 2015 11:46 AM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: How to 'Pipe' Binary Data in Apache Spark

I just wanted to reiterate the solution for the benefit of the community.

The problem is not from my use of 'pipe', but that 'textFile' cannot be used to 
read in binary data. (Doh) There are a couple options to move forward.

1. Implement a custom 'InputFormat' that understands the binary input data. 
(Per Sean Owen)

2. Use 'SparkContext.binaryFiles' to read in the entire binary file as a single 
record. This will impact performance as it 

Re: spark-shell has syntax error on windows.

2015-01-23 Thread Yana Kadiyska
https://issues.apache.org/jira/browse/SPARK-5389

I marked as minor since I also just discovered that I can run it under
PowerShell just fine. Vladimir, feel free to change the bug if you're
getting a different message or a more serious issue.

On Fri, Jan 23, 2015 at 4:44 PM, Josh Rosen rosenvi...@gmail.com wrote:

 Do you mind filing a JIRA issue for this which includes the actual error
 message string that you saw?  https://issues.apache.org/jira/browse/SPARK

 On Thu, Jan 22, 2015 at 8:31 AM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 I am not sure if you get the same exception as I do -- spark-shell2.cmd
 works fine for me. Windows 7 as well. I've never bothered looking to fix it
 as it seems spark-shell just calls spark-shell2 anyway...

 On Thu, Jan 22, 2015 at 3:16 AM, Vladimir Protsenko 
 protsenk...@gmail.com wrote:

 I have a problem with running spark shell in windows 7. I made the
 following
 steps:

 1. downloaded and installed Scala 2.11.5
 2. downloaded spark 1.2.0 by git clone git://github.com/apache/spark.git
 3. run dev/change-version-to-2.11.sh and mvn -Dscala-2.11 -DskipTests
 clean
 package (in git bash)

 After installation tried to run spark-shell.cmd in cmd shell and it says
 there is a syntax error in file. What could I do to fix problem?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-has-syntax-error-on-windows-tp21313.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Problems saving a large RDD (1 TB) to S3 as a sequence file

2015-01-23 Thread Darin McBeath
Thanks for the ideas Sven.

I'm using stand-alone cluster (Spark 1.2).
FWIW, I was able to get this running (just now).  This is the first time it's 
worked in probably my last 10 attempts.

In addition to limiting the executors to only 50% of the cluster.  In the 
settings below, I additionally added/changed  the following.  Maybe, I just got 
lucky (although I think not).  Would be good if someone could weigh in and 
agree that these changes are sensible.  I'm also hoping the support for 
placement groups (targeted for 1.3 in the ec2 scripts) will help the situation. 
 All in all, it takes about 45 minutes to write a 1 TB file back to S3 (as 1024 
partitions).


SparkConf conf = new SparkConf()
.setAppName(SparkSync Application)
.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
.set(spark.rdd.compress,true) 
.set(spark.core.connection.ack.wait.timeout,600) 
.set(spark.akka.timeout,600)// Increased from 300
.set(spark.akka.threads,16) // Added so that default was increased 
from 4 to 16
.set(spark.task.maxFailures,64) // Didn't really matter as I had no 
failures in this run
.set(spark.storage.blockManagerSlaveTimeoutMs,30);



From: Sven Krasser kras...@gmail.com
To: Darin McBeath ddmcbe...@yahoo.com 
Cc: User user@spark.apache.org 
Sent: Friday, January 23, 2015 5:12 PM
Subject: Re: Problems saving a large RDD (1 TB) to S3 as a sequence file



Hey Darin,

Are you running this over EMR or as a standalone cluster? I've had occasional 
success in similar cases by digging through all executor logs and trying to 
find exceptions that are not caused by the application shutdown (but the logs 
remain my main pain point with Spark).

That aside, another explanation could be S3 throttling you due to volume (and 
hence causing write requests to fail). You can try to split your file into 
multiple pieces and store those as S3 objects with different prefixes to make 
sure they end up in different partitions in S3. See here for details: 
http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html.
 If that works, that'll narrow the cause down.

Best,
-Sven






On Fri, Jan 23, 2015 at 12:04 PM, Darin McBeath ddmcbe...@yahoo.com.invalid 
wrote:

I've tried various ideas, but I'm really just shooting in the dark.

I have an 8 node cluster of r3.8xlarge machines. The RDD (with 1024 
partitions) I'm trying to save off to S3 is approximately 1TB in size (with 
the partitions pretty evenly distributed in size).

I just tried a test to dial back the number of executors on my cluster from 
using the entire cluster (256 cores) down to 128.  Things seemed to get a bit 
farther (maybe) before the wheels started spinning off again.  But, the job 
always fails when all I'm trying to do is save the 1TB file to S3.

I see the following in my master log file.

15/01/23 19:01:54 WARN master.Master: Removing worker-20150123172316 because 
we got no heartbeat in 60 seconds
15/01/23 19:01:54 INFO master.Master: Removing worker worker-20150123172316 on
15/01/23 19:01:54 INFO master.Master: Telling app of lost executor: 3

For the stage that eventually fails, I see the following summary information.

Summary Metrics for 729 Completed Tasks
Duration 2.5 min 4.8 min 5.5 min 6.3 min 9.2 min
GC Time   0 ms 0.3 s 0.4 s 0.5 s 5 s

Shuffle Read (Remote) 309.3 MB 321.7 MB 325.4 MB 329.6 MB 350.6 MB

So, the max GC was only 5s for 729 completed tasks.  This sounds reasonable.  
As people tend to indicate GC is the reason one loses executors, this does not 
appear to be my case.

Here is a typical snapshot for some completed tasks.  So, you can see that 
they tend to complete in approximately 6 minutes.  So, it takes about 6 
minutes to write one partition to S3 (a partition being roughly 1 GB)

65  23619   0   SUCCESS ANY 5 /  2015/01/23 18:30:32   
 5.8 min 0.9 s   344.6 MB
59  23613   0   SUCCESS ANY 7 /  2015/01/23 18:30:32   
 6.0 min 0.4 s   324.1 MB
68  23622   0   SUCCESS ANY 1 /  2015/01/23 18:30:32   
 5.7 min 0.5 s   329.9 MB
62  23616   0   SUCCESS ANY 6 /  2015/01/23 18:30:32   
 5.8 min 0.7 s   326.4 MB
61  23615   0   SUCCESS ANY 3 /  2015/01/23 18:30:32   
 5.5 min 1 s 335.7 MB
64  23618   0   SUCCESS ANY 2 /  2015/01/23 18:30:32   
 5.6 min 2 s 328.1 MB

Then towards the end, when things start heading south, I see the following.  
These tasks never complete but you can see that they have taken more than 47 
minutes (so far) before the job finally fails.  Not really sure why.

671 24225   0   RUNNING ANY 1 /  2015/01/23 18:59:14   
 47 min
672 24226   0   RUNNING ANY 1 /  2015/01/23 18:59:14   
 47 min
673 24227   0   RUNNING ANY 1 /  2015/01/23 

RE: spark 1.1.0 save data to hdfs failed

2015-01-23 Thread ey-chih chow
Thanks.  But I think I already mark all the Spark and Hadoop reps as provided.  
Why the cluster's version is not used?
Any way, as I mentioned in the previous message, after changing the 
hadoop-client to version 1.2.1 in my maven deps, I already pass the exception 
and go to another one as indicated below.  Any suggestion on this?
=Exception in thread main 
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
at 
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.IncompatibleClassChangeError: Implementing class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at 
org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73)
at 
org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newTaskAttemptContext(SparkHadoopMapReduceUtil.scala:35)
at 
org.apache.spark.rdd.PairRDDFunctions.newTaskAttemptContext(PairRDDFunctions.scala:53)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:932)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832)
at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:103)
at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala)... 6 
more  
 From: so...@cloudera.com
 Date: Fri, 23 Jan 2015 10:41:12 +
 Subject: Re: spark 1.1.0 save data to hdfs failed
 To: eyc...@hotmail.com
 CC: user@spark.apache.org
 
 So, you should not depend on Hadoop artifacts unless you use them
 directly. You should mark Hadoop and Spark deps as provided. Then the
 cluster's version is used at runtime with spark-submit. That's the
 usual way to do it, which works.
 
 If you need to embed Spark in your app and are running it outside the
 cluster for some reason, and you have to embed Hadoop and Spark code
 in your app, the version has to match. You should also use mvn
 dependency:tree to see all the dependencies coming in. There may be
 many sources of a Hadoop dep.
 
 On Fri, Jan 23, 2015 at 1:05 AM, ey-chih chow eyc...@hotmail.com wrote:
  Thanks.  But after I replace the maven dependence from
 
  dependency
   groupIdorg.apache.hadoop/groupId
   artifactIdhadoop-client/artifactId
   version2.5.0-cdh5.2.0/version
   scopeprovided/scope
   exclusions
 exclusion
   groupIdorg.mortbay.jetty/groupId
   artifactIdservlet-api/artifactId
 /exclusion
 exclusion
   groupIdjavax.servlet/groupId
   artifactIdservlet-api/artifactId
 /exclusion
 exclusion
   groupIdio.netty/groupId
   artifactIdnetty/artifactId
 /exclusion
   /exclusions
  /dependency
 
  to
 
  dependency
 
   groupIdorg.apache.hadoop/groupId
 
   artifactIdhadoop-client/artifactId
 
   version1.0.4/version
 
   scopeprovided/scope
 
   exclusions
 
 exclusion
 
   groupIdorg.mortbay.jetty/groupId
 
   artifactIdservlet-api/artifactId
 
 /exclusion
 
   

Re: spark 1.1.0 save data to hdfs failed

2015-01-23 Thread Sean Owen
These are all definitely symptoms of mixing incompatible versions of libraries.

I'm not suggesting you haven't excluded Spark / Hadoop, but, this is
not the only way Hadoop deps get into your app. See my suggestion
about investigating the dependency tree.

On Fri, Jan 23, 2015 at 1:53 PM, ey-chih chow eyc...@hotmail.com wrote:
 Thanks.  But I think I already mark all the Spark and Hadoop reps as
 provided.  Why the cluster's version is not used?

 Any way, as I mentioned in the previous message, after changing the
 hadoop-client to version 1.2.1 in my maven deps, I already pass the
 exception and go to another one as indicated below.  Any suggestion on this?

 =

 Exception in thread main java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
 at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
 Caused by: java.lang.IncompatibleClassChangeError: Implementing class
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
 at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
 at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
 at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:191)
 at
 org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73)
 at
 org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newTaskAttemptContext(SparkHadoopMapReduceUtil.scala:35)
 at
 org.apache.spark.rdd.PairRDDFunctions.newTaskAttemptContext(PairRDDFunctions.scala:53)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:932)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832)
 at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:103)
 at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala)

 ... 6 more


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark 1.1.0 save data to hdfs failed

2015-01-23 Thread ey-chih chow
Sorry I still did not quiet get your resolution.  In my jar, there are 
following three related classes:
org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.classorg/apache/hadoop/mapreduce/task/TaskAttemptContextImpl$DummyReporter.classorg/apache/hadoop/mapreduce/TaskAttemptContext.class
I think the first two come from hadoop2 and the third from hadoop1.  I would 
like to get rid of the first two.  I checked my source code.  It does have a 
place using the class (or interface in hadoop2) TaskAttemptContext.Do you mean 
I make a separate jar for this portion of code and built with hadoop1 to get 
rid of dependency?  An alternative way is to  modify the code in 
SparkHadoopMapReduceUtil.scala and put it into my own source code to bypass the 
problem.  Any comment on this?  Thanks.
From: eyc...@hotmail.com
To: so...@cloudera.com
CC: user@spark.apache.org
Subject: RE: spark 1.1.0 save data to hdfs failed
Date: Fri, 23 Jan 2015 11:17:36 -0800




Thanks.  I looked at the dependency tree.  I did not see any dependent jar of 
hadoop-core from hadoop2.  However the jar built from maven has the class:
 org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.class
Do you know why?



Date: Fri, 23 Jan 2015 17:01:48 +
Subject: RE: spark 1.1.0 save data to hdfs failed
From: so...@cloudera.com
To: eyc...@hotmail.com

Are you receiving my replies? I have suggested a resolution. Look at the 
dependency tree next. 
On Jan 23, 2015 2:43 PM, ey-chih chow eyc...@hotmail.com wrote:



I looked into the source code of SparkHadoopMapReduceUtil.scala. I think it is 
broken in the following code:
  def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): 
TaskAttemptContext = {val klass = firstAvailableClass(
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl,  // hadoop2, 
hadoop2-yarnorg.apache.hadoop.mapreduce.TaskAttemptContext)   
// hadoop1val ctor = klass.getDeclaredConstructor(classOf[Configuration], 
classOf[TaskAttemptID])ctor.newInstance(conf, 
attemptId).asInstanceOf[TaskAttemptContext]  }
In other words, it is related to hadoop2, hadoop2-yarn, and hadoop1.  Any 
suggestion how to resolve it?
Thanks.


 From: so...@cloudera.com
 Date: Fri, 23 Jan 2015 14:01:45 +
 Subject: Re: spark 1.1.0 save data to hdfs failed
 To: eyc...@hotmail.com
 CC: user@spark.apache.org
 
 These are all definitely symptoms of mixing incompatible versions of 
 libraries.
 
 I'm not suggesting you haven't excluded Spark / Hadoop, but, this is
 not the only way Hadoop deps get into your app. See my suggestion
 about investigating the dependency tree.
 
 On Fri, Jan 23, 2015 at 1:53 PM, ey-chih chow eyc...@hotmail.com wrote:
  Thanks.  But I think I already mark all the Spark and Hadoop reps as
  provided.  Why the cluster's version is not used?
 
  Any way, as I mentioned in the previous message, after changing the
  hadoop-client to version 1.2.1 in my maven deps, I already pass the
  exception and go to another one as indicated below.  Any suggestion on this?
 
  =
 
  Exception in thread main java.lang.reflect.InvocationTargetException
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at
  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at
  org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
  at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
  Caused by: java.lang.IncompatibleClassChangeError: Implementing class
  at java.lang.ClassLoader.defineClass1(Native Method)
  at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
  at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
  at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:191)
  at
  org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73)
  at
  org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newTaskAttemptContext(SparkHadoopMapReduceUtil.scala:35)
  at
  org.apache.spark.rdd.PairRDDFunctions.newTaskAttemptContext(PairRDDFunctions.scala:53)
  at
  org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:932)
  at
  

Re: Large number of pyspark.daemon processes

2015-01-23 Thread Sandy Ryza
Hi Sven,

What version of Spark are you running?  Recent versions have a change that
allows PySpark to share a pool of processes instead of starting a new one
for each task.

-Sandy

On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser kras...@gmail.com wrote:

 Hey all,

 I am running into a problem where YARN kills containers for being over
 their memory allocation (which is about 8G for executors plus 6G for
 overhead), and I noticed that in those containers there are tons of
 pyspark.daemon processes hogging memory. Here's a snippet from a container
 with 97 pyspark.daemon processes. The total sum of RSS usage across all of
 these is 1,764,956 pages (i.e. 6.7GB on the system).

 Any ideas what's happening here and how I can get the number of
 pyspark.daemon processes back to a more reasonable count?

 2015-01-23 15:36:53,654 INFO  [Reporter] yarn.YarnAllocationHandler 
 (Logging.scala:logInfo(59)) - Container marked as failed: 
 container_1421692415636_0052_01_30. Exit status: 143. Diagnostics: 
 Container [pid=35211,containerID=container_1421692415636_0052_01_30] is 
 running beyond physical memory limits. Current usage: 14.9 GB of 14.5 GB 
 physical memory used; 41.3 GB of 72.5 GB virtual memory used. Killing 
 container.
 Dump of the process-tree for container_1421692415636_0052_01_30 :
   |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
   |- 54101 36625 36625 35211 (python) 78 1 332730368 16834 python -m 
 pyspark.daemon
   |- 52140 36625 36625 35211 (python) 58 1 332730368 16837 python -m 
 pyspark.daemon
   |- 36625 35228 36625 35211 (python) 65 604 331685888 17694 python -m 
 pyspark.daemon

   [...]


 Full output here: https://gist.github.com/skrasser/e3e2ee8dede5ef6b082c

 Thank you!
 -Sven

 --
 http://sites.google.com/site/krasser/?utm_source=sig



Re: How to replay consuming messages from kafka using spark streaming?

2015-01-23 Thread mykidong
Hi,

I have written spark streaming kafka receiver using kafka simple consumer
api:
https://github.com/mykidong/spark-kafka-simple-consumer-receiver

This kafka receiver can be used as alternative to the current spark
streaming kafka receiver which is just written in high level kafka consumer
api.

With this kafka receiver, the kafka message offset control can be done more
easier for Receiver Woker Node Failure and Driver Node Failure.

- Kidong.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-replay-consuming-messages-from-kafka-using-spark-streaming-tp21145p21343.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming: getting data from Cassandra based on input stream values

2015-01-23 Thread madhu phatak
Hi,
In that case, you can try the following.

val joinRDD = kafkaStream.transform( streamRDD = {

val ids = streamRDD.map(_._2).collect();

ids.map(userId =  ctable.select(user_name).where(userid = ?,
userId).toArray(0).get[String](0))

// better create a query which checks for all those ids at same time
})


On Sat, Jan 24, 2015 at 3:32 AM, Greg Temchenko s...@dicefield.com wrote:

  Hi Madhu,
 Thanks for you response!
 But as I understand in this case you select all data from the Cassandra
 table. I don't wanna do it as it can be huge. I wanna just lookup some ids
 in the table. So it doesn't make sense for me how I can put some values
 from the streamRDD to the cassandra query (to where method).

 Greg



 On 1/23/15 1:11 AM, madhu phatak wrote:

 Hi,
 Seems like you want to get username for a give user id. You can use
 transform on the kafka stream to join two RDD's. The psuedo code looks like
 this

  val joinRDD = kafkaStream.transform( streamRDD = {

  streamRDD.map(value = (value._2,value._1)) join with
  (ctable.select(userid,username))

  })

 On Fri, Jan 23, 2015 at 10:12 AM, Greg Temchenko s...@dicefield.com
 wrote:

  Hi there,

 I think I have a basic question, but I'm sort of stuck with figuring out
 how to approach it, and I thought someone could point me to the right
 direction.

 I'd like pull some data from Cassandra based on values received from an
 input stream. Something like

 val ctable = ssc.cassandraTable(keyspace, users)
 val userNames = kafkaStream.flatMap {
   case (key,userid) = {
 val userName = ctable.select(user_name).where(userid = ?,
 userId).toArray(0).get[String](0)
 Some(userId, userName)
   }
 }


 While the Cassandra query works in Spark shell, it throws an exception
 when I used it inside flatMap:

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent
 failure: Lost task 0.0 in stage 46.0 (TID 35, localhost):
 java.lang.NullPointerException:
 org.apache.spark.rdd.RDD.init(RDD.scala:125)

 com.datastax.spark.connector.rdd.CassandraRDD.init(CassandraRDD.scala:49)

 com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83)

 com.datastax.spark.connector.rdd.CassandraRDD.select(CassandraRDD.scala:143)

 My understanding is that I cannot produce an RDD (Cassandra results)
 inside another RDD. But how should I approach the problem instead?



 Thanks,

 --
 Greg




  --
  Regards,
 Madhukara Phatak
 http://www.madhukaraphatak.com



 --
 Greg




-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Re: While Loop

2015-01-23 Thread Ted Yu
Can you tell us the problem you're facing ?

Please see this thread:
http://search-hadoop.com/m/JW1q5SsB5m

Cheers

On Fri, Jan 23, 2015 at 9:02 PM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Hi,
 Is there a better programming construct than while loop in Spark?

 Thank You



While Loop

2015-01-23 Thread Deep Pradhan
Hi,
Is there a better programming construct than while loop in Spark?

Thank You


Re: Large number of pyspark.daemon processes

2015-01-23 Thread Adam Diaz
Yarn only has the ability to kill not checkpoint or sig suspend.  If you
use too much memory it will simply kill tasks based upon the yarn config.
https://issues.apache.org/jira/browse/YARN-2172


On Friday, January 23, 2015, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Sven,

 What version of Spark are you running?  Recent versions have a change that
 allows PySpark to share a pool of processes instead of starting a new one
 for each task.

 -Sandy

 On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser kras...@gmail.com
 javascript:_e(%7B%7D,'cvml','kras...@gmail.com'); wrote:

 Hey all,

 I am running into a problem where YARN kills containers for being over
 their memory allocation (which is about 8G for executors plus 6G for
 overhead), and I noticed that in those containers there are tons of
 pyspark.daemon processes hogging memory. Here's a snippet from a container
 with 97 pyspark.daemon processes. The total sum of RSS usage across all of
 these is 1,764,956 pages (i.e. 6.7GB on the system).

 Any ideas what's happening here and how I can get the number of
 pyspark.daemon processes back to a more reasonable count?

 2015-01-23 15:36:53,654 INFO  [Reporter] yarn.YarnAllocationHandler 
 (Logging.scala:logInfo(59)) - Container marked as failed: 
 container_1421692415636_0052_01_30. Exit status: 143. Diagnostics: 
 Container [pid=35211,containerID=container_1421692415636_0052_01_30] is 
 running beyond physical memory limits. Current usage: 14.9 GB of 14.5 GB 
 physical memory used; 41.3 GB of 72.5 GB virtual memory used. Killing 
 container.
 Dump of the process-tree for container_1421692415636_0052_01_30 :
  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
  |- 54101 36625 36625 35211 (python) 78 1 332730368 16834 python -m 
 pyspark.daemon
  |- 52140 36625 36625 35211 (python) 58 1 332730368 16837 python -m 
 pyspark.daemon
  |- 36625 35228 36625 35211 (python) 65 604 331685888 17694 python -m 
 pyspark.daemon

  [...]


 Full output here: https://gist.github.com/skrasser/e3e2ee8dede5ef6b082c

 Thank you!
 -Sven

 --
 krasser http://sites.google.com/site/krasser/?utm_source=sig





Re: Aggregations based on sort order

2015-01-23 Thread Imran Rashid
I'm not sure about this, but I suspect the answer is:  spark doesn't
guarantee a stable sort, nor does it plan to in the future, so the
implementation has more flexibility.

But you might be interested in the work being done on secondary sort, which
could give you the guarantees you want:
https://issues.apache.org/jira/browse/SPARK-3655
On Jan 19, 2015 4:52 PM, justin.uang justin.u...@gmail.com wrote:

 Hi,

 I am trying to aggregate a key based on some timestamp, and I believe that
 spilling to disk is changing the order of the data fed into the combiner.

 I have some timeseries data that is of the form: (key, date, other
 data)

 Partition 1
 (A, 2, ...)
 (B, 4, ...)
 (A, 1, ...)
 (A, 3, ...)
 (B, 6, ...)

 which I then partition by key, then sort within the partition:

 Partition 1
 (A, 1, ...)
 (A, 2, ...)
 (A, 3, ...)
 (A, 4, ...)

 Partition 2
 (B, 4, ...)
 (B, 6, ...)

 If I run a combineByKey with the same partitioner, then the items for each
 key will be fed into the ExternalAppendOnlyMap in the correct order.
 However, if I spill, then the time slices are spilled to disk as multiple
 partial combiners. When its time to merge the spilled combiners for each
 key, the combiners are combined in the wrong order.

 For example, if during a groupByKey, [(A, 1, ...), (A, 2...)] and
 [(A,
 3, ...), (A, 4, ...)] are spilled separately, it's possible that the
 combiners can be combined in the wrong order, like [(A, 3, ...), (A, 4,
 ...), (A, 1, ...), (A, 2, ...)], which invalidates the invariant that
 all the values for A are passed in order to the combiners.

 I'm not an expert, but I suspect that this is because we use a heap ordered
 by key when iterating, which doesn't retain the order the spilled
 combiners.
 Perhaps we can order our mergeHeap by (hash_key, spill_index), where
 spill_index is incremented each time we spill? This would mean that we
 would
 pop and merge the combiners of each key in order, resulting in [(A, 1,
 ...), (A, 2, ...), (A, 3, ...), (A, 4, ...)].

 Thanks in advance for the help! If there is a way to do this already in
 Spark 1.2, can someone point it out to me?

 Best,

 Justin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Aggregations-based-on-sort-order-tp21245.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Starting a spark streaming app in init.d

2015-01-23 Thread Akhil Das
I'd do the same but put an extra condition to check whether the job has
successfully started or not by checking the application ui (port
availability 4040 would do, if you want more complex one then write a
parser for the same.) after putting the main script on sleep for some time
(say 2 minutes).

Thanks
Best Regards

On Sat, Jan 24, 2015 at 1:57 AM, Ashic Mahtab as...@live.com wrote:

 Hello,
 I'm trying to kick off a spark streaming job to a stand alone master using
 spark submit inside of init.d. This is what I have:


 DAEMON=spark-submit --class Streamer --executor-memory 500M
 --total-executor-cores 4 /path/to/assembly.jar

 start() {
 $DAEMON -p /var/run/my_assembly.pid 
 echo OK 
 return 0
 }

 However, will return 0 even if spark_submit fails. Is there a way to run
 spark-submit in the background and return 0 only if it successfully starts
 up? Or better yet, is there something in spark-submit that will allow me to
 do this, perhaps via a command line argument?

 Thanks,
 Ashic.



Re: Large number of pyspark.daemon processes

2015-01-23 Thread Sven Krasser
Hey Adam,

I'm not sure I understand just yet what you have in mind. My takeaway from
the logs is that the container actually was above its allotment of about
14G. Since 6G of that are for overhead, I assumed there to be plenty of
space for Python workers, but there seem to be more of those than I'd
expect.

Does anyone know if that is actually the intended behavior, i.e. in this
case over 90 Python processes on a 2 core executor?

Best,
-Sven


On Fri, Jan 23, 2015 at 10:04 PM, Adam Diaz adam.h.d...@gmail.com wrote:

 Yarn only has the ability to kill not checkpoint or sig suspend.  If you
 use too much memory it will simply kill tasks based upon the yarn config.
 https://issues.apache.org/jira/browse/YARN-2172


 On Friday, January 23, 2015, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Sven,

 What version of Spark are you running?  Recent versions have a change
 that allows PySpark to share a pool of processes instead of starting a new
 one for each task.

 -Sandy

 On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser kras...@gmail.com wrote:

 Hey all,

 I am running into a problem where YARN kills containers for being over
 their memory allocation (which is about 8G for executors plus 6G for
 overhead), and I noticed that in those containers there are tons of
 pyspark.daemon processes hogging memory. Here's a snippet from a container
 with 97 pyspark.daemon processes. The total sum of RSS usage across all of
 these is 1,764,956 pages (i.e. 6.7GB on the system).

 Any ideas what's happening here and how I can get the number of
 pyspark.daemon processes back to a more reasonable count?

 2015-01-23 15:36:53,654 INFO  [Reporter] yarn.YarnAllocationHandler 
 (Logging.scala:logInfo(59)) - Container marked as failed: 
 container_1421692415636_0052_01_30. Exit status: 143. Diagnostics: 
 Container [pid=35211,containerID=container_1421692415636_0052_01_30] is 
 running beyond physical memory limits. Current usage: 14.9 GB of 14.5 GB 
 physical memory used; 41.3 GB of 72.5 GB virtual memory used. Killing 
 container.
 Dump of the process-tree for container_1421692415636_0052_01_30 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 54101 36625 36625 35211 (python) 78 1 332730368 16834 python -m 
 pyspark.daemon
 |- 52140 36625 36625 35211 (python) 58 1 332730368 16837 python -m 
 pyspark.daemon
 |- 36625 35228 36625 35211 (python) 65 604 331685888 17694 python -m 
 pyspark.daemon

 [...]


 Full output here: https://gist.github.com/skrasser/e3e2ee8dede5ef6b082c

 Thank you!
 -Sven

 --
 krasser http://sites.google.com/site/krasser/?utm_source=sig





-- 
http://sites.google.com/site/krasser/?utm_source=sig


Re: Large number of pyspark.daemon processes

2015-01-23 Thread Davies Liu
It should be a bug, the Python worker did not exit normally, could you
file a JIRA for this?

Also, could you show how to reproduce this behavior?

On Fri, Jan 23, 2015 at 11:45 PM, Sven Krasser kras...@gmail.com wrote:
 Hey Adam,

 I'm not sure I understand just yet what you have in mind. My takeaway from
 the logs is that the container actually was above its allotment of about
 14G. Since 6G of that are for overhead, I assumed there to be plenty of
 space for Python workers, but there seem to be more of those than I'd
 expect.

 Does anyone know if that is actually the intended behavior, i.e. in this
 case over 90 Python processes on a 2 core executor?

 Best,
 -Sven


 On Fri, Jan 23, 2015 at 10:04 PM, Adam Diaz adam.h.d...@gmail.com wrote:

 Yarn only has the ability to kill not checkpoint or sig suspend.  If you
 use too much memory it will simply kill tasks based upon the yarn config.
 https://issues.apache.org/jira/browse/YARN-2172


 On Friday, January 23, 2015, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Sven,

 What version of Spark are you running?  Recent versions have a change
 that allows PySpark to share a pool of processes instead of starting a new
 one for each task.

 -Sandy

 On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser kras...@gmail.com wrote:

 Hey all,

 I am running into a problem where YARN kills containers for being over
 their memory allocation (which is about 8G for executors plus 6G for
 overhead), and I noticed that in those containers there are tons of
 pyspark.daemon processes hogging memory. Here's a snippet from a container
 with 97 pyspark.daemon processes. The total sum of RSS usage across all of
 these is 1,764,956 pages (i.e. 6.7GB on the system).

 Any ideas what's happening here and how I can get the number of
 pyspark.daemon processes back to a more reasonable count?

 2015-01-23 15:36:53,654 INFO  [Reporter] yarn.YarnAllocationHandler
 (Logging.scala:logInfo(59)) - Container marked as failed:
 container_1421692415636_0052_01_30. Exit status: 143. Diagnostics:
 Container [pid=35211,containerID=container_1421692415636_0052_01_30] is
 running beyond physical memory limits. Current usage: 14.9 GB of 14.5 GB
 physical memory used; 41.3 GB of 72.5 GB virtual memory used. Killing
 container.
 Dump of the process-tree for container_1421692415636_0052_01_30 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 54101 36625 36625 35211 (python) 78 1 332730368 16834 python -m
 pyspark.daemon
|- 52140 36625 36625 35211 (python) 58 1 332730368 16837 python -m
 pyspark.daemon
|- 36625 35228 36625 35211 (python) 65 604 331685888 17694 python -m
 pyspark.daemon

[...]


 Full output here: https://gist.github.com/skrasser/e3e2ee8dede5ef6b082c

 Thank you!
 -Sven

 --
 krasser





 --
 http://sites.google.com/site/krasser/?utm_source=sig

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problems saving a large RDD (1 TB) to S3 as a sequence file

2015-01-23 Thread Akhil Das
Can you also try increasing the akka framesize?

.set(spark.akka.frameSize,50) // Set it to a higher number


Thanks
Best Regards

On Sat, Jan 24, 2015 at 3:58 AM, Darin McBeath ddmcbe...@yahoo.com.invalid
wrote:

 Thanks for the ideas Sven.

 I'm using stand-alone cluster (Spark 1.2).
 FWIW, I was able to get this running (just now).  This is the first time
 it's worked in probably my last 10 attempts.

 In addition to limiting the executors to only 50% of the cluster.  In the
 settings below, I additionally added/changed  the following.  Maybe, I just
 got lucky (although I think not).  Would be good if someone could weigh in
 and agree that these changes are sensible.  I'm also hoping the support for
 placement groups (targeted for 1.3 in the ec2 scripts) will help the
 situation.  All in all, it takes about 45 minutes to write a 1 TB file back
 to S3 (as 1024 partitions).


 SparkConf conf = new SparkConf()
 .setAppName(SparkSync Application)
 .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
 .set(spark.rdd.compress,true)
 .set(spark.core.connection.ack.wait.timeout,600)
 .set(spark.akka.timeout,600)// Increased from 300
 .set(spark.akka.threads,16) // Added so that default was
 increased from 4 to 16
 .set(spark.task.maxFailures,64) // Didn't really matter as I had
 no failures in this run
 .set(spark.storage.blockManagerSlaveTimeoutMs,30);


 
 From: Sven Krasser kras...@gmail.com
 To: Darin McBeath ddmcbe...@yahoo.com
 Cc: User user@spark.apache.org
 Sent: Friday, January 23, 2015 5:12 PM
 Subject: Re: Problems saving a large RDD (1 TB) to S3 as a sequence file



 Hey Darin,

 Are you running this over EMR or as a standalone cluster? I've had
 occasional success in similar cases by digging through all executor logs
 and trying to find exceptions that are not caused by the application
 shutdown (but the logs remain my main pain point with Spark).

 That aside, another explanation could be S3 throttling you due to volume
 (and hence causing write requests to fail). You can try to split your file
 into multiple pieces and store those as S3 objects with different prefixes
 to make sure they end up in different partitions in S3. See here for
 details:
 http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html.
 If that works, that'll narrow the cause down.

 Best,
 -Sven






 On Fri, Jan 23, 2015 at 12:04 PM, Darin McBeath
 ddmcbe...@yahoo.com.invalid wrote:

 I've tried various ideas, but I'm really just shooting in the dark.
 
 I have an 8 node cluster of r3.8xlarge machines. The RDD (with 1024
 partitions) I'm trying to save off to S3 is approximately 1TB in size (with
 the partitions pretty evenly distributed in size).
 
 I just tried a test to dial back the number of executors on my cluster
 from using the entire cluster (256 cores) down to 128.  Things seemed to
 get a bit farther (maybe) before the wheels started spinning off again.
 But, the job always fails when all I'm trying to do is save the 1TB file to
 S3.
 
 I see the following in my master log file.
 
 15/01/23 19:01:54 WARN master.Master: Removing worker-20150123172316
 because we got no heartbeat in 60 seconds
 15/01/23 19:01:54 INFO master.Master: Removing worker
 worker-20150123172316 on
 15/01/23 19:01:54 INFO master.Master: Telling app of lost executor: 3
 
 For the stage that eventually fails, I see the following summary
 information.
 
 Summary Metrics for 729 Completed Tasks
 Duration 2.5 min 4.8 min 5.5 min 6.3 min 9.2 min
 GC Time   0 ms 0.3 s 0.4 s 0.5 s 5 s
 
 Shuffle Read (Remote) 309.3 MB 321.7 MB 325.4 MB 329.6 MB 350.6 MB
 
 So, the max GC was only 5s for 729 completed tasks.  This sounds
 reasonable.  As people tend to indicate GC is the reason one loses
 executors, this does not appear to be my case.
 
 Here is a typical snapshot for some completed tasks.  So, you can see
 that they tend to complete in approximately 6 minutes.  So, it takes about
 6 minutes to write one partition to S3 (a partition being roughly 1 GB)
 
 65  23619   0   SUCCESS ANY 5 /  2015/01/23 18:30:32
   5.8 min 0.9 s   344.6 MB
 59  23613   0   SUCCESS ANY 7 /  2015/01/23 18:30:32
   6.0 min 0.4 s   324.1 MB
 68  23622   0   SUCCESS ANY 1 /  2015/01/23 18:30:32
   5.7 min 0.5 s   329.9 MB
 62  23616   0   SUCCESS ANY 6 /  2015/01/23 18:30:32
   5.8 min 0.7 s   326.4 MB
 61  23615   0   SUCCESS ANY 3 /  2015/01/23 18:30:32
   5.5 min 1 s 335.7 MB
 64  23618   0   SUCCESS ANY 2 /  2015/01/23 18:30:32
   5.6 min 2 s 328.1 MB
 
 Then towards the end, when things start heading south, I see the
 following.  These tasks never complete but you can see that they have taken
 more than 47 minutes (so far) before the job finally fails.  Not 

Re: Large number of pyspark.daemon processes

2015-01-23 Thread Sven Krasser
Hey Sandy,

I'm using Spark 1.2.0. I assume you're referring to worker reuse? In this
case I've already set spark.python.worker.reuse to false (but it I also so
this behavior when keeping it enabled).

Best,
-Sven


On Fri, Jan 23, 2015 at 4:51 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Sven,

 What version of Spark are you running?  Recent versions have a change that
 allows PySpark to share a pool of processes instead of starting a new one
 for each task.

 -Sandy

 On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser kras...@gmail.com wrote:

 Hey all,

 I am running into a problem where YARN kills containers for being over
 their memory allocation (which is about 8G for executors plus 6G for
 overhead), and I noticed that in those containers there are tons of
 pyspark.daemon processes hogging memory. Here's a snippet from a container
 with 97 pyspark.daemon processes. The total sum of RSS usage across all of
 these is 1,764,956 pages (i.e. 6.7GB on the system).

 Any ideas what's happening here and how I can get the number of
 pyspark.daemon processes back to a more reasonable count?

 2015-01-23 15:36:53,654 INFO  [Reporter] yarn.YarnAllocationHandler 
 (Logging.scala:logInfo(59)) - Container marked as failed: 
 container_1421692415636_0052_01_30. Exit status: 143. Diagnostics: 
 Container [pid=35211,containerID=container_1421692415636_0052_01_30] is 
 running beyond physical memory limits. Current usage: 14.9 GB of 14.5 GB 
 physical memory used; 41.3 GB of 72.5 GB virtual memory used. Killing 
 container.
 Dump of the process-tree for container_1421692415636_0052_01_30 :
  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
  |- 54101 36625 36625 35211 (python) 78 1 332730368 16834 python -m 
 pyspark.daemon
  |- 52140 36625 36625 35211 (python) 58 1 332730368 16837 python -m 
 pyspark.daemon
  |- 36625 35228 36625 35211 (python) 65 604 331685888 17694 python -m 
 pyspark.daemon

  [...]


 Full output here: https://gist.github.com/skrasser/e3e2ee8dede5ef6b082c

 Thank you!
 -Sven

 --
 http://sites.google.com/site/krasser/?utm_source=sig





-- 
http://sites.google.com/site/krasser/?utm_source=sig


Re: Spark Streaming action not triggered with Kafka inputs

2015-01-23 Thread Akhil Das
ssc.union will return a DStream, you should do something like:

val lines = ssc.union(parallelInputs)
lines.print()


Thanks
Best Regards

On Sat, Jan 24, 2015 at 12:55 AM, Chen Song chen.song...@gmail.com wrote:

 I am running into some problems with Spark Streaming when reading from
 Kafka.I used Spark 1.2.0 built on CDH5.
 The example is based on:

 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
 * It works with default implementation.
 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
 topicMap).map(_._2)

 * However, when I changed it to parallel receiving, like shown below

 val topicMap = topics.split(,).map((_, 1)).toMap
 val parallelInputs = (1 to numThreads.toInt) map { _ = KafkaUtils
 .createStream(ssc, zkQuorum, group, topicMap)

 }

 ssc.union(parallelInputs)
 After the change, the job stage just hang there and never finish. It looks
 like no action is triggered on the streaming job. When I check the
 Streaming tab, it show messages below:
 Batch Processing Statistics

No statistics have been generated yet.


 Am I doing anything wrong on the parallel receiving part?

 --
 Chen Song




Re: Installing Spark Standalone to a Cluster

2015-01-23 Thread Akhil Das
Which variable is that you don't understand?

Here's a minimalistic spark-env.sh of mine.

export SPARK_MASTER_IP=192.168.10.28

export HADOOP_CONF_DIR=/home/akhld/sigmoid/localcluster/hadoop/conf
export HADOOP_HOME=/home/akhld/sigmoid/localcluster/hadoop/



Thanks
Best Regards

On Fri, Jan 23, 2015 at 11:50 PM, riginos samarasrigi...@gmail.com wrote:

 I need someone to send me a snapshot of his /conf/spark-env.sh file cause i
 don't understand how to set some vars like SPARK_MASTER etc



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-Standalone-to-a-Cluster-tp21341.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RE: spark 1.1.0 save data to hdfs failed

2015-01-23 Thread ey-chih chow
After I changed the dependency to the following:
dependency
 groupIdorg.apache.hadoop/groupId
 artifactIdhadoop-client/artifactId
 version1.2.1/version
 exclusions
   exclusion
 groupIdorg.mortbay.jetty/groupId
 artifactIdservlet-api/artifactId
   /exclusion
   exclusion
 groupIdjavax.servlet/groupId
 artifactIdservlet-api/artifactId
   /exclusion
   exclusion
 groupIdio.netty/groupId
 artifactIdnetty/artifactId
   /exclusion
 /exclusions
/dependency
I got the following error.  Any idea on this?  Thanks.
===Caused by: java.lang.IncompatibleClassChangeError: Implementing 
class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at 
org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73)
at 
org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newTaskAttemptContext(SparkHadoopMapReduceUtil.scala:35)
at 
org.apache.spark.rdd.PairRDDFunctions.newTaskAttemptContext(PairRDDFunctions.scala:53)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:932)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832)
at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:103)
at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala)
... 6 moreFrom: eyc...@hotmail.com
To: so...@cloudera.com
CC: yuzhih...@gmail.com; user@spark.apache.org
Subject: RE: spark 1.1.0 save data to hdfs failed
Date: Thu, 22 Jan 2015 17:05:26 -0800




Thanks.  But after I replace the maven dependence from
dependency 
groupIdorg.apache.hadoop/groupId 
artifactIdhadoop-client/artifactId 
version2.5.0-cdh5.2.0/version 
scopeprovided/scope exclusions
   exclusion 
groupIdorg.mortbay.jetty/groupId 
artifactIdservlet-api/artifactId   /exclusion 
  exclusion 
groupIdjavax.servlet/groupId 
artifactIdservlet-api/artifactId   /exclusion 
  exclusion 
groupIdio.netty/groupId 
artifactIdnetty/artifactId   /exclusion   
  /exclusions/dependency
todependency
 groupIdorg.apache.hadoop/groupId
 artifactIdhadoop-client/artifactId
 version1.0.4/version
 scopeprovided/scope
 exclusions
   exclusion
 groupIdorg.mortbay.jetty/groupId
 artifactIdservlet-api/artifactId
   /exclusion
   exclusion
 groupIdjavax.servlet/groupId
 artifactIdservlet-api/artifactId
   /exclusion
   exclusion
 groupIdio.netty/groupId
 artifactIdnetty/artifactId
   /exclusion
 /exclusions
/dependency
the warning message is still shown up in the namenode log.  Is there any other 
thing I need to do?


Thanks.


Ey-Chih Chow


 From: so...@cloudera.com
 Date: Thu, 22 Jan 2015 22:34:22 +
 Subject: Re: 

Re: processing large dataset

2015-01-23 Thread Sean Owen
This is kinda a how-long-is-a-piece-of-string question. There is no
one tuning for 'terabytes of data'. You can easily run a Spark job
that processes hundreds of terabytes with no problem with defaults --
something trivial like counting. You can create Spark jobs that will
never complete -- trying to pull the entire data set into a worker.

You haven't said what you're doing exactly, although it sounds simple,
and haven't said what the problem is? is it out of memory? that would
be essential to know to say what if anything you need to change in
your program or cluster.

On Fri, Jan 23, 2015 at 4:52 AM, Kane Kim kane.ist...@gmail.com wrote:
 I'm trying to process 5TB of data, not doing anything fancy, just
 map/filter and reduceByKey. Spent whole day today trying to get it
 processed, but never succeeded. I've tried to deploy to ec2 with the
 script provided with spark on pretty beefy machines (100 r3.2xlarge
 nodes). Really frustrated that spark doesn't work out of the box for
 anything bigger than word count sample. One big problem is that
 defaults are not suitable for processing big datasets, provided ec2
 script could do a better job, knowing instance type requested. Second
 it takes hours to figure out what is wrong, when spark job fails
 almost finished processing. Even after raising all limits as per
 https://spark.apache.org/docs/latest/tuning.html it still fails (now
 with: error communicating with MapOutputTracker).

 After all I have only one question - how to get spark tuned up for
 processing terabytes of data and is there a way to make this
 configuration easier and more transparent?

 Thanks.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: save a histogram to a file

2015-01-23 Thread madhu phatak
Hi,
histogram method return normal scala types not a RDD. So you will not
have saveAsTextFile.
You can use makeRDD method make a rdd out of the data and saveAsObject file

val hist = a.histogram(10)
val histRDD = sc.makeRDD(hist)
histRDD.saveAsObjectFile(path)


On Fri, Jan 23, 2015 at 5:37 AM, SK skrishna...@gmail.com wrote:

 Hi,
 histogram() returns an object that is a  pair of Arrays. There appears to
 be
 no saveAsTextFile() for this paired object.

 Currently I am using the following to save the output to a file:

 val hist = a.histogram(10)

 val arr1 = sc.parallelize(hist._1).saveAsTextFile(file1)
 val arr2 = sc.parallelize(hist._2).saveAsTextFile(file2)

 Is there a simpler way to save the histogram() result to a file?

 thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/save-a-histogram-to-a-file-tp21324.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Re: spark 1.1.0 save data to hdfs failed

2015-01-23 Thread Sean Owen
So, you should not depend on Hadoop artifacts unless you use them
directly. You should mark Hadoop and Spark deps as provided. Then the
cluster's version is used at runtime with spark-submit. That's the
usual way to do it, which works.

If you need to embed Spark in your app and are running it outside the
cluster for some reason, and you have to embed Hadoop and Spark code
in your app, the version has to match. You should also use mvn
dependency:tree to see all the dependencies coming in. There may be
many sources of a Hadoop dep.

On Fri, Jan 23, 2015 at 1:05 AM, ey-chih chow eyc...@hotmail.com wrote:
 Thanks.  But after I replace the maven dependence from

 dependency
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-client/artifactId
  version2.5.0-cdh5.2.0/version
  scopeprovided/scope
  exclusions
exclusion
  groupIdorg.mortbay.jetty/groupId
  artifactIdservlet-api/artifactId
/exclusion
exclusion
  groupIdjavax.servlet/groupId
  artifactIdservlet-api/artifactId
/exclusion
exclusion
  groupIdio.netty/groupId
  artifactIdnetty/artifactId
/exclusion
  /exclusions
 /dependency

 to

 dependency

  groupIdorg.apache.hadoop/groupId

  artifactIdhadoop-client/artifactId

  version1.0.4/version

  scopeprovided/scope

  exclusions

exclusion

  groupIdorg.mortbay.jetty/groupId

  artifactIdservlet-api/artifactId

/exclusion

exclusion

  groupIdjavax.servlet/groupId

  artifactIdservlet-api/artifactId

/exclusion

exclusion

  groupIdio.netty/groupId

  artifactIdnetty/artifactId

/exclusion

  /exclusions

 /dependency


 the warning message is still shown up in the namenode log.  Is there any
 other thing I need to do?


 Thanks.


 Ey-Chih Chow



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Got java.lang.SecurityException: class javax.servlet.FilterRegistration's when running job from intellij Idea

2015-01-23 Thread Marco
Hi,

I've exactly the same issue. I've tried to mark the libraries as 'provided'
but then IntelliJ IDE seems to have deleted the libraries locallythat is
I am not able to build/run the stuff in the IDE.

Is the issue resolved ? I'm not very experienced in SBTI've tried to
exclude the libraries:

/name := SparkDemo

version := 1.0

scalaVersion := 2.10.4


libraryDependencies += org.apache.spark %% spark-core % 1.2.0   
exclude(org.apache.hadoop, hadoop-client)

libraryDependencies += org.apache.spark % spark-sql_2.10 % 1.2.0


libraryDependencies += org.apache.hadoop % hadoop-common % 2.6.0 
excludeAll(
ExclusionRule(organization = org.eclipse.jetty))

libraryDependencies += org.apache.hadoop % hadoop-mapreduce-client-core
% 2.6.0


libraryDependencies += org.apache.hbase % hbase-client %
0.98.4-hadoop2

libraryDependencies += org.apache.hbase % hbase-server %
0.98.4-hadoop2

libraryDependencies += org.apache.hbase % hbase-common %
0.98.4-hadoop2

mainClass in Compile := Some(demo.TruckEvents)/

but this does not work also.

Thanks,
Marco



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Got-java-lang-SecurityException-class-javax-servlet-FilterRegistration-s-when-running-job-from-intela-tp18035p21332.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Got java.lang.SecurityException: class javax.servlet.FilterRegistration's when running job from intellij Idea

2015-01-23 Thread Sean Owen
Use mvn dependency:tree or sbt dependency-tree to print all of the
dependencies. You are probably bringing in more servlet API libs from
some other source?

On Fri, Jan 23, 2015 at 10:57 AM, Marco marco@gmail.com wrote:
 Hi,

 I've exactly the same issue. I've tried to mark the libraries as 'provided'
 but then IntelliJ IDE seems to have deleted the libraries locallythat is
 I am not able to build/run the stuff in the IDE.

 Is the issue resolved ? I'm not very experienced in SBTI've tried to
 exclude the libraries:

 /name := SparkDemo

 version := 1.0

 scalaVersion := 2.10.4


 libraryDependencies += org.apache.spark %% spark-core % 1.2.0
 exclude(org.apache.hadoop, hadoop-client)

 libraryDependencies += org.apache.spark % spark-sql_2.10 % 1.2.0


 libraryDependencies += org.apache.hadoop % hadoop-common % 2.6.0
 excludeAll(
 ExclusionRule(organization = org.eclipse.jetty))

 libraryDependencies += org.apache.hadoop % hadoop-mapreduce-client-core
 % 2.6.0


 libraryDependencies += org.apache.hbase % hbase-client %
 0.98.4-hadoop2

 libraryDependencies += org.apache.hbase % hbase-server %
 0.98.4-hadoop2

 libraryDependencies += org.apache.hbase % hbase-common %
 0.98.4-hadoop2

 mainClass in Compile := Some(demo.TruckEvents)/

 but this does not work also.

 Thanks,
 Marco



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Got-java-lang-SecurityException-class-javax-servlet-FilterRegistration-s-when-running-job-from-intela-tp18035p21332.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext

2015-01-23 Thread Nishant Patel
Below is code I have written. I am getting NotSerializableException. How
can I handle this scenario?

kafkaStream.foreachRDD(rdd = {
  println()
  rdd.foreachPartition(partitionOfRecords = {
partitionOfRecords.foreach(
  record = {

//Write for CSV.
if (true == true) {

  val structType = table.schema
  val csvFile = ssc.sparkContext.textFile(record.toString())

  val rowRDD = csvFile.map(x =
getMappedRowFromCsvRecord(structType, x))

}
  })

-- 
Regards,
Nishant


Re: save a histogram to a file

2015-01-23 Thread Sean Owen
As you can see, the result of histogram() is a pair of arrays, since
of course it's small. It's not necessary and in fact is huge overkill
to make it back into an RDD so you can save it across a bunch of
partitions.

This isn't a job for Spark, but simple Scala code. Off the top of my
head (maybe not 100% right):

import java.io.PrintWriter
val PrintWriter out = new PrintWriter(histogram.csv)
startCount = hist._1.zip(hist._2).foreach { case (start, count) =
out.println(start + , count) }
out.close()

On Fri, Jan 23, 2015 at 12:07 AM, SK skrishna...@gmail.com wrote:
 Hi,
 histogram() returns an object that is a  pair of Arrays. There appears to be
 no saveAsTextFile() for this paired object.

 Currently I am using the following to save the output to a file:

 val hist = a.histogram(10)

 val arr1 = sc.parallelize(hist._1).saveAsTextFile(file1)
 val arr2 = sc.parallelize(hist._2).saveAsTextFile(file2)

 Is there a simpler way to save the histogram() result to a file?

 thanks



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/save-a-histogram-to-a-file-tp21324.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: reducing number of output files

2015-01-23 Thread Sean Owen
It does not necessarily shuffle, yes. I believe it will not if you are
strictly reducing the number of partitions, and do not force a
shuffle. So I think the answer is 'yes'.

If you have a huge number of small files, you can also consider
wholeTextFiles, which gives you entire files of content in each
element of the RDD. It is not necessarily helpful, but thought I'd
mention it, as it could be of interest depending on what you do.

On Fri, Jan 23, 2015 at 2:14 AM, Kane Kim kane.ist...@gmail.com wrote:
 Does it avoid reshuffling? I have 300 thousands output files. If I
 coalesce to the number of cores in the cluster would it keep data
 local? (I have 100 nodes, 4 cores each, does it mean if I
 coalesce(400) it will use all cores and data will stay local)?

 On Thu, Jan 22, 2015 at 3:26 PM, Sean Owen so...@cloudera.com wrote:
 One output file is produced per partition. If you want fewer, use
 coalesce() before saving the RDD.

 On Thu, Jan 22, 2015 at 10:46 PM, Kane Kim kane.ist...@gmail.com wrote:
 How I can reduce number of output files? Is there a parameter to 
 saveAsTextFile?

 Thanks.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext

2015-01-23 Thread Sean Owen
Heh, this question keeps coming up. You can't use a context or RDD
inside a distributed operation, only from the driver. Here you're
trying to call textFile from within foreachPartition.

On Fri, Jan 23, 2015 at 10:59 AM, Nishant Patel
nishant.k.pa...@gmail.com wrote:
 Below is code I have written. I am getting NotSerializableException. How can
 I handle this scenario?

 kafkaStream.foreachRDD(rdd = {
   println()
   rdd.foreachPartition(partitionOfRecords = {
 partitionOfRecords.foreach(
   record = {

 //Write for CSV.
 if (true == true) {

   val structType = table.schema
   val csvFile = ssc.sparkContext.textFile(record.toString())

   val rowRDD = csvFile.map(x =
 getMappedRowFromCsvRecord(structType, x))

 }
   })

 --
 Regards,
 Nishant


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark 1.1.0 save data to hdfs failed

2015-01-23 Thread ey-chih chow
I looked into the source code of SparkHadoopMapReduceUtil.scala. I think it is 
broken in the following code:
  def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): 
TaskAttemptContext = {val klass = firstAvailableClass(
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl,  // hadoop2, 
hadoop2-yarnorg.apache.hadoop.mapreduce.TaskAttemptContext)   
// hadoop1val ctor = klass.getDeclaredConstructor(classOf[Configuration], 
classOf[TaskAttemptID])ctor.newInstance(conf, 
attemptId).asInstanceOf[TaskAttemptContext]  }
In other words, it is related to hadoop2, hadoop2-yarn, and hadoop1.  Any 
suggestion how to resolve it?
Thanks.


 From: so...@cloudera.com
 Date: Fri, 23 Jan 2015 14:01:45 +
 Subject: Re: spark 1.1.0 save data to hdfs failed
 To: eyc...@hotmail.com
 CC: user@spark.apache.org
 
 These are all definitely symptoms of mixing incompatible versions of 
 libraries.
 
 I'm not suggesting you haven't excluded Spark / Hadoop, but, this is
 not the only way Hadoop deps get into your app. See my suggestion
 about investigating the dependency tree.
 
 On Fri, Jan 23, 2015 at 1:53 PM, ey-chih chow eyc...@hotmail.com wrote:
  Thanks.  But I think I already mark all the Spark and Hadoop reps as
  provided.  Why the cluster's version is not used?
 
  Any way, as I mentioned in the previous message, after changing the
  hadoop-client to version 1.2.1 in my maven deps, I already pass the
  exception and go to another one as indicated below.  Any suggestion on this?
 
  =
 
  Exception in thread main java.lang.reflect.InvocationTargetException
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at
  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at
  org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
  at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
  Caused by: java.lang.IncompatibleClassChangeError: Implementing class
  at java.lang.ClassLoader.defineClass1(Native Method)
  at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
  at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
  at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:191)
  at
  org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73)
  at
  org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newTaskAttemptContext(SparkHadoopMapReduceUtil.scala:35)
  at
  org.apache.spark.rdd.PairRDDFunctions.newTaskAttemptContext(PairRDDFunctions.scala:53)
  at
  org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:932)
  at
  org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832)
  at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:103)
  at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala)
 
  ... 6 more
 
  

Can't access nested types with sql

2015-01-23 Thread matthes
I try to work with nested parquet data. To read and write the parquet file is
actually working now but when I try to query a nested field with SqlContext
I get an exception:

RuntimeException: Can't access nested field in type
ArrayType(StructType(List(StructField(...

I generate the parquet file by parsing the data into the following caseclass
structure:

case class areas(area : String, dates : Seq[Int])
case class dataset(userid : Long, source : Int, days : Seq[Int] , areas :
Seq[areas] )

automatic generated schema:
root
 |-- userid: long (nullable = false)
 |-- source: integer (nullable = false)
 |-- days: array (nullable = true)
 ||-- element: integer (containsNull = false)
 |-- areas: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- area: string (nullable = true)
 |||-- dates: array (nullable = true)
 ||||-- element: integer (containsNull = false)
 
After writeing the Parquetfile I load the data again and I create a
SQLContext and try to execute a sql-command like follows:

parquetFile.registerTempTable(testtable)
val result = sqlContext.sql(SELECT areas.area FROM testtable where userid 
50)   
result.map(t = t(0)).collect().foreach(println) // throw the exception 

If I execute this command: val result = sqlContext.sql(SELECT areas[0].area
FROM testtable where userid  50)  
I get only the values at the first position in the array but I need every
value and that doesn't work.
I sow the function t.getAs[...] but everything what I tried didn't worked. 

I hope somebody can help me how I can access a nested field that I read all
values of the nested array or isn't it supported?

I use spark-sql_2.10(v1.2.0), spark-core_2.10(v1.2.0) and parquet 1.6.0rc4.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-access-nested-types-with-sql-tp21336.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



GroupBy multiple attributes

2015-01-23 Thread Boromir Widas
Hello,

I am trying to do a groupBy on 5 attributes to get results in a form like a
pivot table in microsoft excel. The keys are the attribute tuples and
values are double arrays(maybe very large). Based on the code below, I am
getting back correct results, but would like to optimize it further(I
played around with numPartitions).

The two issues I see are -
1. flatMap is needed to expand the key tuples, but this also duplicates the
values, and as the values are large this increases the shuffle input size
for reduceByKey - is there a way to avoid the duplication?
2. reduceByKey is adding two arrays element wise, and creates a new array
for every addition, is there a way to reduce by not creating a new array
everytime(Similar to what accumulators do)?

I am pasting a sample code, query plan and output below.

Thanks.

val attributeToFloatArrayRDD = sc.parallelize(Array(
  (A-1, B-2, C-1, D-1, E-1)   - (0.0 to 1000.0 by 0.25).toArray
  , (A-2, B-1, C-1, D-2, E-1) - (5.0 to 1005.0 by 0.25).toArray
  , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray
  , (A-3, B-3, C-1, D-1, E-2) - (0.0 to 1000.0 by 0.25).toArray
  , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray
  , (A-4, B-3, C-1, D-1, E-1) - (8.0 to 1008.0 by 0.25).toArray
  , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray
))


val groupToVaRRDD = attributeToFloatArrayRDD
  .flatMap(x = x._1 match {
case (t1, t2, t3, t4, t5) = Array((t1+_top), (t1, t2), (t1,
t2, t3), (t1, t2, t3, t4), (t1, t2, t3, t4, t5)).map(y = (y, x._2))
  })
  .reduceByKey((x, y) = {
require(x.size == y.size)
(x,y).zipped.map(_ + _)
  })
  .map(x = {
(x._1, x._2.sorted.take(x._2.size/20).last)
  })


 Query Plan

(16) MappedRDD[12] at map at GroupByTest.scala:81 []

 | ShuffledRDD[11] at reduceByKey at GroupByTest.scala:76 []

 +-(16) FlatMappedRDD[10] at flatMap at GroupByTest.scala:68 []

| ParallelCollectionRDD[9] at parallelize at GroupByTest.scala:56 []

 Output


GroupBy VaR
(A-2,B-1)   54.75
(A-2,B-1,C-1,D-2)   54.75
(A-1,B-1)   149.25
(A-1,B-1,C-1,D-1,E-1)   149.25
(A-3,B-3,C-1)   49.75
(A-3,B-3)   49.75
(A-4,B-3,C-1,D-1,E-1)   57.75
(A-2,B-1,C-1)   54.75
(A-1,B-2,C-1,D-1,E-1)   49.75
(A-1,B-1,C-1,D-1)   149.25
(A-3,B-3,C-1,D-1,E-2)   49.75
(A-1,B-2,C-1)   49.75
(A-3,B-3,C-1,D-1)   49.75
(A-4,B-3)   57.75
(A-1,B-1,C-1)   149.25
A-1_top 199.0
(A-4,B-3,C-1,D-1)   57.75
A-2_top 54.75
(A-1,B-2)   49.75
(A-4,B-3,C-1)   57.75
A-3_top 49.75
A-4_top 57.75
(A-2,B-1,C-1,D-2,E-1)   54.75
(A-1,B-2,C-1,D-1)   49.75


RE: spark 1.1.0 save data to hdfs failed

2015-01-23 Thread ey-chih chow
I also think the code is not robust enough.  First, Spark works with hadoop1, 
why the code try hadoop2 first.  Also the following code only handle 
ClassNotFoundException.  It should handle all the exceptions.
  private def firstAvailableClass(first: String, second: String): Class[_] = {  
  try {  Class.forName(first)} catch {  case e: 
ClassNotFoundException =Class.forName(second)}  }
From: eyc...@hotmail.com
To: so...@cloudera.com
CC: user@spark.apache.org
Subject: RE: spark 1.1.0 save data to hdfs failed
Date: Fri, 23 Jan 2015 06:43:00 -0800




I looked into the source code of SparkHadoopMapReduceUtil.scala. I think it is 
broken in the following code:
  def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): 
TaskAttemptContext = {val klass = firstAvailableClass(
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl,  // hadoop2, 
hadoop2-yarnorg.apache.hadoop.mapreduce.TaskAttemptContext)   
// hadoop1val ctor = klass.getDeclaredConstructor(classOf[Configuration], 
classOf[TaskAttemptID])ctor.newInstance(conf, 
attemptId).asInstanceOf[TaskAttemptContext]  }
In other words, it is related to hadoop2, hadoop2-yarn, and hadoop1.  Any 
suggestion how to resolve it?
Thanks.


 From: so...@cloudera.com
 Date: Fri, 23 Jan 2015 14:01:45 +
 Subject: Re: spark 1.1.0 save data to hdfs failed
 To: eyc...@hotmail.com
 CC: user@spark.apache.org
 
 These are all definitely symptoms of mixing incompatible versions of 
 libraries.
 
 I'm not suggesting you haven't excluded Spark / Hadoop, but, this is
 not the only way Hadoop deps get into your app. See my suggestion
 about investigating the dependency tree.
 
 On Fri, Jan 23, 2015 at 1:53 PM, ey-chih chow eyc...@hotmail.com wrote:
  Thanks.  But I think I already mark all the Spark and Hadoop reps as
  provided.  Why the cluster's version is not used?
 
  Any way, as I mentioned in the previous message, after changing the
  hadoop-client to version 1.2.1 in my maven deps, I already pass the
  exception and go to another one as indicated below.  Any suggestion on this?
 
  =
 
  Exception in thread main java.lang.reflect.InvocationTargetException
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at
  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at
  org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
  at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
  Caused by: java.lang.IncompatibleClassChangeError: Implementing class
  at java.lang.ClassLoader.defineClass1(Native Method)
  at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
  at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
  at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:191)
  at
  org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73)
  at
  org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newTaskAttemptContext(SparkHadoopMapReduceUtil.scala:35)
  at
  org.apache.spark.rdd.PairRDDFunctions.newTaskAttemptContext(PairRDDFunctions.scala:53)
  at
  org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:932)
  at
  org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832)
  at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:103)
  at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala)
 
  ... 6 more
 

  

Re: How to make spark partition sticky, i.e. stay with node?

2015-01-23 Thread mingyu
I found a workaround.
I can make my auxiliary data a RDD. Partition it and cache it.
Later, I can cogroup it with other RDDs and Spark will try to keep the
cached RDD partitions where they are and not shuffle them. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-spark-partition-sticky-i-e-stay-with-node-tp21322p21338.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org