Re: Broadcast variables in R
There shouldn't be anything Mac OS specific about this feature. One point of warning though -- As mentioned previously in this thread the APIs were made private because we aren't sure we will be supporting them in the future. If you are using these APIs it would be good to chime in on the JIRA with your use-case Thanks Shivaram On Tue, Jul 21, 2015 at 2:34 AM, Serge Franchois serge.franch...@altran.com wrote: I might add to this that I've done the same exercise on Linux (CentOS 6) and there, broadcast variables ARE working. Is this functionality perhaps not exposed on Mac OS X? Or has it to do with the fact there are no native Hadoop libs for Mac? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-in-R-tp23914p23927.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: The auxService:spark_shuffle does not exist
Hi Andrew Or, Yes, NodeManager was restarted, I also checked the logs to see if the JARs appear in the CLASSPATH. I have also downloaded the binary distribution and use the JAR spark-1.4.1-bin-hadoop2.4/lib/spark-1.4.1-yarn-shuffle.jar without success. Has anyone successfully enabled the spark_shuffle via the documentation https://spark.apache.org/docs/1.4.1/job-scheduling.html ?? I'm testing it on Hadoop 2.4.1. Any feedback or suggestion are appreciated, thanks. Date: Fri, 17 Jul 2015 15:35:29 -0700 Subject: Re: The auxService:spark_shuffle does not exist From: and...@databricks.com To: alee...@hotmail.com CC: zjf...@gmail.com; rp...@njit.edu; user@spark.apache.org Hi all, Did you forget to restart the node managers after editing yarn-site.xml by any chance? -Andrew 2015-07-17 8:32 GMT-07:00 Andrew Lee alee...@hotmail.com: I have encountered the same problem after following the document. Here's my spark-defaults.confspark.shuffle.service.enabled true spark.dynamicAllocation.enabled true spark.dynamicAllocation.executorIdleTimeout 60 spark.dynamicAllocation.cachedExecutorIdleTimeout 120 spark.dynamicAllocation.initialExecutors 2 spark.dynamicAllocation.maxExecutors 8 spark.dynamicAllocation.minExecutors 1 spark.dynamicAllocation.schedulerBacklogTimeout 10 and yarn-site.xml configured. property nameyarn.nodemanager.aux-services/name valuespark_shuffle,mapreduce_shuffle/value /property ... property nameyarn.nodemanager.aux-services.spark_shuffle.class/name valueorg.apache.spark.network.yarn.YarnShuffleService/value /property and deployed the 2 JARs to NodeManager's classpath /opt/hadoop/share/hadoop/mapreduce/. (I also checked the NodeManager log and the JARs appear in the classpath). I notice that the JAR location is not the same as the document in 1.4. I found them under network/yarn/target and network/shuffle/target/ after building it with -Phadoop-2.4 -Psparkr -Pyarn -Phive -Phive-thriftserver in maven. spark-network-yarn_2.10-1.4.1.jar spark-network-shuffle_2.10-1.4.1.jar and still getting the following exception. Exception in thread ContainerLauncher #0 java.lang.Error: org.apache.spark.SparkException: Exception while starting container container_1437141440985_0003_01_02 on host alee-ci-2058-slave-2.test.foo.com at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: org.apache.spark.SparkException: Exception while starting container container_1437141440985_0003_01_02 on host alee-ci-2058-slave-2.test.foo.com at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:116) at org.apache.spark.deploy.yarn.ExecutorRunnable.run(ExecutorRunnable.scala:67) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ... 2 more Caused by: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:152) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) Not sure what else am I missing here or doing wrong? Appreciate any insights or feedback, thanks. Date: Wed, 8 Jul 2015 09:25:39 +0800 Subject: Re: The auxService:spark_shuffle does not exist From: zjf...@gmail.com To: rp...@njit.edu CC: user@spark.apache.org Did you enable the dynamic resource allocation ? You can refer to this page for how to configure spark shuffle service for yarn. https://spark.apache.org/docs/1.4.0/job-scheduling.html On Tue, Jul 7, 2015 at 10:55 PM, roy rp...@njit.edu wrote: we tried --master yarn-client with no different result. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-auxService-spark-shuffle-does-not-exist-tp23662p23689.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards Jeff Zhang
RE: Would driver shutdown cause app dead?
ZhuGe, If you run your program in the cluster deploy-mode you get resiliency against driver failure, though there are some steps you have to take in how you write your streaming job to allow for transparent resume. Netflix did a nice writeup of this resiliency herehttp://techblog.netflix.com/2015/03/can-spark-streaming-survive-chaos-monkey.html. If you tie in ZooKeeper you can also get resiliency against Master failure, which has some documentation herehttp://spark.apache.org/docs/1.4.0/spark-standalone.html. Regards, -- Matthew From: ZhuGe [t...@outlook.com] Sent: Tuesday, July 21, 2015 3:07 AM To: user@spark.apache.org Subject: Would driver shutdown cause app dead? Hi all: I am a bit confuse about the work of driver. In our productin enviroment, we have a spark streaming app running in standone mode. what we concern is that if the driver shutdown accidently(host shutdown or whatever). would the app running normally? Any explanation would be appreciated!! Cheers - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Checkpointing solutions
TD's Spark Summit talk offers suggestions ( https://spark-summit.org/2015/events/recipes-for-running-spark-streaming-applications-in-production/). He recommends using HDFS, because you get the triplicate resiliency it offers, albeit with extra overhead. I believe the driver doesn't need visibility to the checkpointing directory, e.g., if you're running in client mode, but all the cluster nodes would need to see it for recovering a lost stage, where it might get started on a different node. Hence, I would think NFS could work, if all nodes have the same mount, although there would be a lot of network overhead. In some situations, a high performance file system appliance, e.g., NAS, could suffice. My $0.02, dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Jul 21, 2015 at 10:43 AM, Emmanuel fortin.emman...@gmail.com wrote: Hi, I'm working on a Spark Streaming application and I would like to know what is the best storage to use for checkpointing. For testing purposes we're are using NFS between the worker, the master and the driver program (in client mode), but we have some issues with the CheckpointWriter (1 thread dedicated). *My understanding is that NFS is not a good candidate for this usage.* 1. What is the best solution for checkpointing and what are the alternatives ? 2. Does checkpointings directories need to be shared by the driver application and the workers too ? Thanks for your replies -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-solutions-tp23932.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark MLlib instead of Mahout - collaborative filtering model
I have never used Mahout, so cannot compare the two. Spark MLlib, however, provides matrix factorization based Collaborative Filtering http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html using Alternating Least Squares algorithm. Also, Singular Value Decomposition is handled by MLlib under Dimensionality Reduction http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html portion and Pearson/Spearman correlation is provided under Basic Statistics http://spark.apache.org/docs/latest/mllib-statistics.html . -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-MLlib-instead-of-Mahout-collaborative-filtering-model-tp23919p23934.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: user threads in executors
You can certainly create threads in a map transformation. We do this to do concurrent DB lookups during one stage for example. I would recommend, however, that you switch to mapPartitions from map as this allows you to create a fixed size thread pool to share across items on a partition as opposed to spawning a future per record in the RDD for example. On Tue, Jul 21, 2015 at 4:11 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi Can I create user threads in executors. I have a streaming app where after processing I have a requirement to push events to external system . Each post request costs ~90-100 ms. To make post parllel, I can not use same thread because that is limited by no of cores available in system , can I useuser therads in spark App? I tried to create 2 thredas in a map tasks and it worked. Is there any upper limit on no of user threds in spark executor ? Is it a good idea to create user threads in spark map task? Thanks -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: Spark Streaming Checkpointing solutions
Thank you for your reply. I will consider hdfs for the checkpoint storage. Le mar. 21 juil. 2015 à 17:51, Dean Wampler deanwamp...@gmail.com a écrit : TD's Spark Summit talk offers suggestions ( https://spark-summit.org/2015/events/recipes-for-running-spark-streaming-applications-in-production/). He recommends using HDFS, because you get the triplicate resiliency it offers, albeit with extra overhead. I believe the driver doesn't need visibility to the checkpointing directory, e.g., if you're running in client mode, but all the cluster nodes would need to see it for recovering a lost stage, where it might get started on a different node. Hence, I would think NFS could work, if all nodes have the same mount, although there would be a lot of network overhead. In some situations, a high performance file system appliance, e.g., NAS, could suffice. My $0.02, dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Jul 21, 2015 at 10:43 AM, Emmanuel fortin.emman...@gmail.com wrote: Hi, I'm working on a Spark Streaming application and I would like to know what is the best storage to use for checkpointing. For testing purposes we're are using NFS between the worker, the master and the driver program (in client mode), but we have some issues with the CheckpointWriter (1 thread dedicated). *My understanding is that NFS is not a good candidate for this usage.* 1. What is the best solution for checkpointing and what are the alternatives ? 2. Does checkpointings directories need to be shared by the driver application and the workers too ? Thanks for your replies -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-solutions-tp23932.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Convert Simple Kafka Consumer to standalone Spark JavaStream Consumer
Hi, I have a simple High level Kafka Consumer like : package matchinguu.kafka.consumer; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.*; public class SimpleHLConsumer { private final ConsumerConnector consumer; private final String topic; public SimpleHLConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); props.put(zookeeper.connect, zookeeper); props.put(group.id, groupId); props.put(zookeeper.session.timeout.ms, 500); props.put(zookeeper.sync.time.ms, 250); props.put(auto.commit.interval.ms, 1000); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); this.topic = topic; } public void testConsumer() { MapString, Integer topicCount = new HashMapString, Integer(); topicCount.put(topic, 1); MapString, Listlt;KafkaStreamlt;byte[], byte[] consumerStreams = consumer.createMessageStreams(topicCount); ListKafkaStreamlt;byte[], byte[] streams = consumerStreams.get(topic); for (final KafkaStream stream : streams) { ConsumerIteratorbyte[], byte[] it = stream.iterator(); while (it.hasNext()) { System.out.println(); System.out.println(Message from Single Topic: + new String(it.next().message())); } } if (consumer != null) { System.out.println(Shutdown Happens); consumer.shutdown(); } } public static void main(String[] args) { System.out.println(Consumer is now reading messages from producer); //String topic = args[0]; String topic = test; SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer(localhost:2181, testgroup, topic); simpleHLConsumer.testConsumer(); } } I want to get my messages through Spark Java Streaming with Kafka integration. Can anyone help me to reform this code so that I can get same output with Spark Kafka integration. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Convert-Simple-Kafka-Consumer-to-standalone-Spark-JavaStream-Consumer-tp23930.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Accumulator value 0 in driver
I am using Accumulators in a JavaRDDLabeledPoint.flatMap() method. I have logged the localValue() of the Accumulator object and their values are non-zero. In the driver, after the .flatMap() method returns, the calling value() on the Accumulator yields 0. I am running 1.4.0 in yarn-client mode. Any pointers would be appreciated.
SparkR sqlContext or sc not found in RStudio
Hi I could successfully install SparkR package into my RStudio but I could not execute anything against sc or sqlContext. I did the following: Sys.setenv(SPARK_HOME=/path/to/sparkE1.4.1) .libPaths(c(file.path(Sys.getenv(SPARK_HOME),R,lib),.libPaths())) library(SparkR) Above code installs packages and when I type the following I get Spark references which shows my installation is correct sc Java ref type org.apache.spark.api.java.JavaSparkContext id 0 sparkSql.init(sc) Java ref type org.apache.spark.sql.SQLContext id 3 But when I try to execute anything against sc or sqlContext it says object not found. For e.g. df createDataFrame(sqlContext,faithful) It fails saying sqlContext not found. Dont know what is wrong with the setup please guide. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-sqlContext-or-sc-not-found-in-RStudio-tp23928.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accumulator value 0 in driver
Have you called collect() / count() on the RDD following flatMap() ? Cheers On Tue, Jul 21, 2015 at 8:47 AM, dlmar...@comcast.net wrote: I am using Accumulators in a JavaRDDLabeledPoint.flatMap() method. I have logged the localValue() of the Accumulator object and their values are non-zero. In the driver, after the .flatMap() method returns, the calling value() on the Accumulator yields 0. I am running 1.4.0 in yarn-client mode. Any pointers would be appreciated.
Re: Timestamp functions for sqlContext
Hi Tal, I'm not sure there is currently a built-in function for it, but you can easily define a UDF (user defined function) by extending org.apache.spark.sql.api.java.UDF1, registering it (sparkContext.udf().register(...)), and then use it inside your query. RK. On Tue, Jul 21, 2015 at 7:04 PM Tal Rozen t...@scaleka.com wrote: Hi, I'm running a query with sql context where one of the fields is of type java.sql.Timestamp. I'd like to set a function similar to DATEDIFF in mysql, between the date given in each row, and now. So If I was able to use the same syntax as in mysql it would be: val date_diff_df = sqlContext.sql(select DATEDIFF(curdate(), rowTimestamp) date_diff from tableName) What are the relevant key words to replace curdate(), and DATEDIFF? Thanks
Re: What else is need to setup native support of BLAS/LAPACK with Spark?
FWIW I've run into similar BLAS related problems before and wrote up a document on how to do this for Spark EC2 clusters at https://github.com/amplab/ml-matrix/blob/master/EC2.md -- Note that this works with a vanilla Spark build (you only need to link to netlib-lgpl in your App) but requires the app jar to be present on all the machines. Thanks Shivaram On Tue, Jul 21, 2015 at 7:37 AM, Arun Ahuja aahuj...@gmail.com wrote: Yes, I imagine it's the driver's classpath - I'm pulling those screenshots straight from the Spark UI environment page. Is there somewhere else to grab the executor class path? Also, the warning is only printing once, so it's also not clear whether the warning is from the driver or exectuor, would you know? Thanks, Arun On Tue, Jul 21, 2015 at 7:52 AM, Sean Owen so...@cloudera.com wrote: Great, and that file exists on HDFS and is world readable? just double-checking. What classpath is this -- your driver or executor? this is the driver, no? I assume so just because it looks like it references the assembly you built locally and from which you're launching the driver. I think we're concerned with the executors and what they have on the classpath. I suspect there is still a problem somewhere in there. On Mon, Jul 20, 2015 at 4:59 PM, Arun Ahuja aahuj...@gmail.com wrote: Cool, I tried that as well, and doesn't seem different: spark.yarn.jar seems set [image: Inline image 1] This actually doesn't change the classpath, not sure if it should: [image: Inline image 3] But same netlib warning. Thanks for the help! - Arun On Fri, Jul 17, 2015 at 3:18 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Can you try setting the spark.yarn.jar property to make sure it points to the jar you're thinking of? -Sandy On Fri, Jul 17, 2015 at 11:32 AM, Arun Ahuja aahuj...@gmail.com wrote: Yes, it's a YARN cluster and using spark-submit to run. I have SPARK_HOME set to the directory above and using the spark-submit script from there. bin/spark-submit --master yarn-client --executor-memory 10g --driver-memory 8g --num-executors 400 --executor-cores 1 --class org.hammerlab.guacamole.Guacamole --conf spark.default.parallelism=4000 --conf spark.storage.memoryFraction=0.15 libgfortran.so.3 is also there ls /usr/lib64/libgfortran.so.3 /usr/lib64/libgfortran.so.3 These are jniloader files in the jar jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep jniloader META-INF/maven/com.github.fommil/jniloader/ META-INF/maven/com.github.fommil/jniloader/pom.xml META-INF/maven/com.github.fommil/jniloader/pom.properties Thanks, Arun On Fri, Jul 17, 2015 at 1:30 PM, Sean Owen so...@cloudera.com wrote: Make sure /usr/lib64 contains libgfortran.so.3; that's really the issue. I'm pretty sure the answer is 'yes', but, make sure the assembly has jniloader too. I don't see why it wouldn't, but, that's needed. What is your env like -- local, standalone, YARN? how are you running? Just want to make sure you are using this assembly across your cluster. On Fri, Jul 17, 2015 at 6:26 PM, Arun Ahuja aahuj...@gmail.com wrote: Hi Sean, Thanks for the reply! I did double-check that the jar is one I think I am running: [image: Inline image 2] jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native com/github/fommil/netlib/NativeRefARPACK.class com/github/fommil/netlib/NativeRefBLAS.class com/github/fommil/netlib/NativeRefLAPACK.class com/github/fommil/netlib/NativeSystemARPACK.class com/github/fommil/netlib/NativeSystemBLAS.class com/github/fommil/netlib/NativeSystemLAPACK.class Also, I checked the gfortran version on the cluster nodes and it is available and is 5.1 $ gfortran --version GNU Fortran (GCC) 5.1.0 Copyright (C) 2015 Free Software Foundation, Inc. and still see: 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Does anything need to be adjusted in my application POM? Thanks, Arun On Thu, Jul 16, 2015 at 5:26 PM, Sean Owen so...@cloudera.com wrote: Yes, that's most of the work, just getting the native libs into the assembly. netlib can find them from there even if you don't have BLAS libs on your OS, since it includes a reference implementation as a fallback. One common reason it won't load is not having libgfortran installed on your OSes though. It has to be 4.6+ too. That can't be shipped even in netlib and has to exist on your hosts. The other
Timestamp functions for sqlContext
Hi, I'm running a query with sql context where one of the fields is of type java.sql.Timestamp. I'd like to set a function similar to DATEDIFF in mysql, between the date given in each row, and now. So If I was able to use the same syntax as in mysql it would be: val date_diff_df = sqlContext.sql(select DATEDIFF(curdate(), rowTimestamp) date_diff from tableName) What are the relevant key words to replace curdate(), and DATEDIFF? Thanks
query over hive context hangs, please help
The thread dump is here, seems hang on accessing mysql meta store. I googled and find a bug related to com.mysql.jdbc.util.ReadAheadInputStream, but don't have a workaround. And I am not sure about that. please help me. thanks. thread dump--- MyAppDefaultScheduler_Worker-2 prio=10 tid=0x7f5e50463000 nid=0xd17f runnable [0x7f5e06ff9000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at com.mysql.jdbc.util.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:156) at com.mysql.jdbc.util.ReadAheadInputStream.read(ReadAheadInputStream.java:187) - locked 0x00060b6aaba0 (a com.mysql.jdbc.util.ReadAheadInputStream) at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:3158) at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3671) at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3604) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4149) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2615) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2776) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2834) - locked 0x00060b6493a8 (a com.mysql.jdbc.JDBC4Connection) at com.mysql.jdbc.ConnectionImpl.rollbackNoChecks(ConnectionImpl.java:5200) at com.mysql.jdbc.ConnectionImpl.rollback(ConnectionImpl.java:5083) - locked 0x00060b6493a8 (a com.mysql.jdbc.JDBC4Connection) at com.jolbox.bonecp.ConnectionHandle.rollback(ConnectionHandle.java:1272) at org.datanucleus.store.rdbms.ConnectionFactoryImpl$EmulatedXAResource.rollback(ConnectionFactoryImpl.java:705) at org.datanucleus.transaction.Transaction.rollback(Transaction.java:548) at org.datanucleus.transaction.TransactionManager.rollback(TransactionManager.java:85) at org.datanucleus.TransactionImpl.internalRollback(TransactionImpl.java:517) - locked 0x000614b7e3c0 (a org.datanucleus.TransactionImpl) at org.datanucleus.TransactionImpl.rollback(TransactionImpl.java:445) at org.datanucleus.api.jdo.JDOTransaction.rollback(JDOTransaction.java:182) at org.apache.hadoop.hive.metastore.ObjectStore.rollbackTransaction(ObjectStore.java:438) at org.apache.hadoop.hive.metastore.ObjectStore$GetHelper.close(ObjectStore.java:2278) at org.apache.hadoop.hive.metastore.ObjectStore$GetHelper.run(ObjectStore.java:2225) at org.apache.hadoop.hive.metastore.ObjectStore.getPartitionsInternal(ObjectStore.java:1559) at org.apache.hadoop.hive.metastore.ObjectStore.getPartitions(ObjectStore.java:1553) at sun.reflect.GeneratedMethodAccessor57.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:108) at com.sun.proxy.$Proxy20.getPartitions(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_partitions(HiveMetaStore.java:2516) at sun.reflect.GeneratedMethodAccessor56.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105) at com.sun.proxy.$Proxy21.get_partitions(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitions(HiveMetaStoreClient.java:856) at sun.reflect.GeneratedMethodAccessor55.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at com.sun.proxy.$Proxy22.listPartitions(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getAllPartitionsOf(Hive.java:1782) at org.apache.spark.sql.hive.HiveShim$.getAllPartitionsOf(Shim13.scala:354) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:226) - locked 0x000619304820 (a org.apache.spark.sql.hive.HiveContext$$anon$3) at org.apache.spark.sql.hive.HiveContext$$anon$3.org $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:262) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:161) at
How to build Spark with my own version of Hadoop?
Hi, I have modified some Hadoop code, and want to build Spark with the modified version of Hadoop. Do I need to change the compilation dependency files? How to then? Great thanks!
many-to-many join
Quick example problem that's stumping me: * Users have 1 or more phone numbers and therefore one or more area codes. * There are 100M users. * States have one or more area codes. * I would like to the states for the users (as indicated by phone area code). I was thinking about something like this: If area_code_user looks like (area_code,[user_id]) ex: (615,[1234567]) and area_code_state looks like (area_code,state) ex: (615, [Tennessee]) then we could do states_and_users_mixed = area_code_user.join(area_code_state) \ .reduceByKey(lambda a,b: a+b) \ .values() user_state_pairs = states_and_users_mixed.flatMap( emit_cartesian_prod_of_userids_and_states ) user_to_states = user_state_pairs.reduceByKey(lambda a,b: a+b) user_to_states.first(1) (1234567,[Tennessee,Tennessee,California]) This would work, but the user_state_pairs is just a list of user_ids and state names mixed together and emit_cartesian_prod_of_userids_and_states has to correctly pair them. This is problematic because 1) it's weird and sloppy and 2) there will be lots of users per state and having so many users in a single row is going to make emit_cartesian_prod_of_userids_and_states work extra hard to first locate states and then emit all userid-state pairs. How should I be doing this? Thanks, -John
Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing
Yeah, I'm referring to that api. If you want to filter messages in addition to catching that exception, have your mesageHandler return an option, so the type R would end up being Option[WhateverYourClassIs], then filter out None before doing the rest of your processing. If you aren't already recording offsets somewhere, and need to find the offsets at the beginning of the topic, you can take a look at this https://github.com/apache/spark/blob/branch-1.3/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L143 as an example of querying offsets from Kafka. That code is private, but you can either use it as an example, or remove the private[spark] and recompile just the spark-streaming-kafka package. That artifact is included in your job assembly, so you won't have to redeploy spark if you go that route. On Tue, Jul 21, 2015 at 6:42 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hi Cody, Thanks for your answer. I'm with Spark 1.3.0. I don't quite understand how to use the messageHandler parameter/function in the createDirectStream method. You are referring to this, aren't you ? def createDirectStream[ K: ClassTag, V: ClassTag, KD : Decoder[K]: ClassTag, VD : Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext , kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] = R ): InputDStream[R] = { new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets, messageHandler) } So, I must supply the fromOffsets parameter too, but how do I tell this method to read from the beginning of my topic ? If I have a filter (e.g. a R.date field) on my R class, I can put a filter in the messageHandler function too ? Regards, Nicolas P. On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Yeah, in the function you supply for the messageHandler parameter to createDirectStream, catch the exception and do whatever makes sense for your application. On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hello, Using the old Spark Streaming Kafka API, I got the following around the same offset: kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3561357254, computed crc = 171652633) at kafka.message.Message.ensureValid(Message.scala:166) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message java.lang.IllegalStateException: Iterator is in failed state at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I found some old topic about some possible corrupt Kafka message produced by the new producer API with Snappy compression on. My question is, is it possible to skip/ignore those offsets when full processing with KafkaUtils.createStream or KafkaUtils.createDirectStream ? Regards, Nicolas PHUNG On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger c...@koeninger.org wrote: I'd try logging the offsets for each message, see where problems start, then try using the console consumer starting at those offsets and see if you can reproduce the problem. On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hi Cody, Thanks for you help. It seems there's something wrong with some messages within my Kafka topics then. I don't understand how, I can get bigger or incomplete message since I use default configuration to accept only 1Mb message in my Kafka topic. If you have any others informations or suggestions, please tell me. Regards, Nicolas
Re: SparkR sqlContext or sc not found in RStudio
Hi thanks for the reply. I did download from github build it and it is working fine I can use spark-submit etc when I use it in RStudio I dont know why it is saying sqlContext not found When I do the following sqlContext sparkRSQL.init(sc) Error: object sqlContext not found if I do the following sparkRSQL.init(sc) Java ref type org.apache.spark.sql.SQLContext id 3 I dont know whats wrong here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-sqlContext-or-sc-not-found-in-RStudio-tp23928p23931.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming Checkpointing solutions
Hi, I'm working on a Spark Streaming application and I would like to know what is the best storage to use for checkpointing. For testing purposes we're are using NFS between the worker, the master and the driver program (in client mode), but we have some issues with the CheckpointWriter (1 thread dedicated). *My understanding is that NFS is not a good candidate for this usage.* 1. What is the best solution for checkpointing and what are the alternatives ? 2. Does checkpointings directories need to be shared by the driver application and the workers too ? Thanks for your replies -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-solutions-tp23932.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
pyspark equivalent to Extends Serializable
I'm trying to define a class that contains as attributes some of Spark's objects and am running into a problem that I think would be solved I can find python's equivalent of Scala's Extends Serializable. Here's a simple class that has a Spark RDD as one of its attributes. class Foo: def __init__(self): self.rdd = sc.parallelize([1,2,3,4,5]) def combine(self,first,second): return first + second def f1(self): return self.rdd.reduce(lambda a,b : self.combine(a,b)) When I try b = Foo() b.f1() I get the error: PicklingError: Can't pickle builtin type 'method_descriptor' My guess is that this has to do with serialization of the class I created and an error there. So how can I use Spark's RDD methods (such as reduce()) in conjunction with the methods of the class I've created (combine() here) ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-equivalent-to-Extends-Serializable-tp23933.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: No. of Task vs No. of Executors
Thanks All! thanks Ayan! I did the repartition to 20 so it used all cores in the cluster and was done in 3 minutes. seems data was skewed to this partition. On Tue, Jul 14, 2015 at 8:05 PM, ayan guha guha.a...@gmail.com wrote: Hi As you can see, Spark has taken data locality into consideration and thus scheduled all tasks as node local. It is because spark could run task on a node where data is present, so spark went ahead and scheduled the tasks. It is actually good for reading. If you really want to fan out processing, you may do a repartition(n). Regarding slowness, as you can see another task has completed successfully in 6 mins in Excutor id 2.So it does not seem that node itself is slow. it is possible the computation for one node is skewed. you may want to switch on speculative execution to see if the same task gets completed in other node faster or not. If yes, then its a node issue, else, ost ikely data issue On Tue, Jul 14, 2015 at 11:43 PM, shahid sha...@trialx.com wrote: hi I have a 10 node cluster i loaded the data onto hdfs, so the no. of partitions i get is 9. I am running a spark application , it gets stuck on one of tasks, looking at the UI it seems application is not using all nodes to do calculations. attached is the screen shot of tasks, it seems tasks are put on each node more then once. looking at tasks 8 tasks get completed under 7-8 minutes and one task takes around 30 minutes so causing the delay in results. http://apache-spark-user-list.1001560.n3.nabble.com/file/n23824/Screen_Shot_2015-07-13_at_9.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-of-Task-vs-No-of-Executors-tp23824.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards, Ayan Guha -- with Regards Shahid Ashraf
Re: The auxService:spark_shuffle does not exist
Hi Andrew, Based on your driver logs, it seems the issue is that the shuffle service is actually not running on the NodeManagers, but your application is trying to provide a spark_shuffle secret anyway. One way to verify whether the shuffle service is actually started is to look at the NodeManager logs for the following lines: *Initializing YARN shuffle service for Spark* *Started YARN shuffle service for Spark on port X* These should be logged under the INFO level. Also, could you verify whether *all* the executors have this problem, or just a subset? If even one of the NM doesn't have the shuffle service, you'll see the stack trace that you ran into. It would be good to confirm whether the yarn-site.xml change is actually reflected on all NMs if the log statements above are missing. Let me know if you can get it working. I've run the shuffle service myself on the master branch (which will become Spark 1.5.0) recently following the instructions and have not encountered any problems. -Andrew
Re: Accumulator value 0 in driver
No, I have not. I will try that though, thank you. - Original Message - From: Ted Yu yuzhih...@gmail.com To: dlmarion dlmar...@comcast.net Cc: user user@spark.apache.org Sent: Tuesday, July 21, 2015 12:15:13 PM Subject: Re: Accumulator value 0 in driver Have you called collect() / count() on the RDD following flatMap() ? Cheers On Tue, Jul 21, 2015 at 8:47 AM, dlmar...@comcast.net wrote: I am using Accumulators in a JavaRDDLabeledPoint.flatMap() method. I have logged the localValue() of the Accumulator object and their values are non-zero. In the driver, after the .flatMap() method returns, the calling value() on the Accumulator yields 0. I am running 1.4.0 in yarn-client mode. Any pointers would be appreciated.
Re: Accumulator value 0 in driver
That did it, thanks. - Original Message - From: dlmar...@comcast.net To: Ted Yu yuzhih...@gmail.com Cc: user user@spark.apache.org Sent: Tuesday, July 21, 2015 1:15:37 PM Subject: Re: Accumulator value 0 in driver No, I have not. I will try that though, thank you. - Original Message - From: Ted Yu yuzhih...@gmail.com To: dlmarion dlmar...@comcast.net Cc: user user@spark.apache.org Sent: Tuesday, July 21, 2015 12:15:13 PM Subject: Re: Accumulator value 0 in driver Have you called collect() / count() on the RDD following flatMap() ? Cheers On Tue, Jul 21, 2015 at 8:47 AM, dlmar...@comcast.net wrote: I am using Accumulators in a JavaRDDLabeledPoint.flatMap() method. I have logged the localValue() of the Accumulator object and their values are non-zero. In the driver, after the .flatMap() method returns, the calling value() on the Accumulator yields 0. I am running 1.4.0 in yarn-client mode. Any pointers would be appreciated.
Re: LinearRegressionWithSGD Outputs NaN
Hi, Could you please decrease your step size to 0.1, and also try 0.01? You could also try running L-BFGS, which doesn't have step size tuning, to get better results. Best, Burak On Tue, Jul 21, 2015 at 2:59 AM, Naveen nav...@formcept.com wrote: Hi , I am trying to use LinearRegressionWithSGD on Million Song Data Set and my model returns NaN's as weights and 0.0 as the intercept. What might be the issue for the error ? I am using Spark 1.40 in standalone mode. Below is my model: val numIterations = 100 val stepSize = 1.0 val regParam = 0.01 val regType = L2 val algorithm = new LinearRegressionWithSGD() algorithm.optimizer.setNumIterations(numIterations).setStepSize(stepSize).setRegParam(regParam) val model = algorithm.run(parsedTrainData) Regards, Naveen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path
*java:* java version 1.7.0_60 Java(TM) SE Runtime Environment (build 1.7.0_60-b19) Java HotSpot(TM) 64-Bit Server VM (build 24.60-b09, mixed mode) *scala* Scala code runner version 2.10.5 -- Copyright 2002-2013, LAMP/EPFL *hadoop cluster:* with 51 Servers that hadoop-2.3-cdh-5.1.0 version ,and also setup Snappy , ls /home/cluster/apps/hadoop/lib/native/libsnappy* /home/cluster/apps/hadoop/lib/native/libsnappyjava.so /home/cluster/apps/hadoop/lib/native/libsnappy.so.1 /home/cluster/apps/hadoop/lib/native/libsnappy.so /home/cluster/apps/hadoop/lib/native/libsnappy.so.1.1.3 when submit hadoop wordcount with SnappyCodec ,it can run success~ * ./bin/hadoop jar ~/opt/project/hadoop-demo/out/artifacts/hadoop_demo/hadoop-demo.jar com.hadoop.demo.mapreduce.base.WordCountWithSnappyCodec -Dmapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec /data/hadoop/wordcount/input output13* But when submit spark job with yarn-cluster, */home/dp/spark/spark-1.4/spark-1.4.1-test/bin/spark-submit --master yarn-cluster --executor-memory 3g --driver-memory 1g --class org.apache.spark.examples.SparkPi /home/dp/spark/spark-1.4/spark-1.4.1-test/examples/target/spark-examples_2.10-1.4.1.jar 10* PS: As I know spark use of snappy-version:1.1.1.7,but hadoop-2.3-cdh-5.1.0 use of snappy-version:1.0.4.1,So I have replaced hadoop-2.3-cdh-5.1.0 with snappy-version:1.1.1.7,but it also do not work~ It will run fail and blow error: 15/07/22 10:29:55 DEBUG component.AbstractLifeCycle: STARTED o.s.j.s.ServletContextHandler{/metrics/json,null} java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:317) at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:219) at org.xerial.snappy.Snappy.clinit(Snappy.java:44) at org.apache.spark.io.SnappyCompressionCodec.init(CompressionCodec.scala:150) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68) at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60) at org.apache.spark.scheduler.EventLoggingListener.init(EventLoggingListener.scala:69) at org.apache.spark.SparkContext.init(SparkContext.scala:513) at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:28) at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:483) Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1886) at java.lang.Runtime.loadLibrary0(Runtime.java:849) at java.lang.System.loadLibrary(System.java:1088) at org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52) ... 23 more 15/07/22 10:29:55 ERROR spark.SparkContext: Error initializing SparkContext. java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68) at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60) at org.apache.spark.scheduler.EventLoggingListener.init(EventLoggingListener.scala:69) at org.apache.spark.SparkContext.init(SparkContext.scala:513) at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:28) at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
Re: RowId unique key for Dataframes
Will work. Thanks! zipWithUniqueId() doesn't guarantee continuous ID either. Srikanth On Tue, Jul 21, 2015 at 9:48 PM, Burak Yavuz brk...@gmail.com wrote: Would monotonicallyIncreasingId https://github.com/apache/spark/blob/d4c7a7a3642a74ad40093c96c4bf45a62a470605/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L637 work for you? Best, Burak On Tue, Jul 21, 2015 at 4:55 PM, Srikanth srikanth...@gmail.com wrote: Hello, I'm creating dataframes from three CSV files using spark-csv package. I want to add a unique ID for each row in dataframe. Not sure how withColumn() can be used to achieve this. I need a Long value not an UUID. One option I found was to create a RDD and use zipWithUniqueId. sqlContext.textFile(file). zipWithUniqueId(). map(case(d, i)=i.toString + delimiter + d). map(_.split(delimiter)). map(s=caseclass(...)) .toDF().select(field1, field2) Its a bit hacky. Is there an easier way to do this on dataframes and use spark-csv? Srikanth
Re: Partition parquet data by ENUM column
On 7/22/15 9:03 AM, Ankit wrote: Thanks a lot Cheng. So it seems even in spark 1.3 and 1.4, parquet ENUMs were treated as Strings in Spark SQL right? So does this mean partitioning for enums already works in previous versions too since they are just treated as strings? It’s a little bit complicated. A Thrift/Avro/ProtoBuf |ENUM| value is represented as a |BINARY| annotated with original type |ENUM| in Parquet. For example, an optional |ENUM| field |e| is translated to something like |optional BINARY e (ENUM)| in Parquet. And the underlying data is always a UTF8 string of the |ENUM| name. However, the Parquet original type |ENUM| is not documented, thus Spark 1.3 and 1.4 doesn’t recognize the |ENUM| annotation and just see it as a normal |BINARY|. (I didn’t even notice the existence of |ENUM| in Parquet before PR #7048…) On the other hand, Spark SQL has a boolean option named |spark.sql.parquet.binaryAsString|. When this option is set to |true|, all Parquet |BINARY| values are considered and converted to UTF8 strings. The original purpose of this option is used to work around a bug of Hive, which writes strings as plain Parquet |BINARY| values without a proper |UTF8| annotation. That said, by using |sqlContext.setConf(spark.sql.parquet.binaryAsString, true)| in Scala/Java/Python, or |SET spark.sql.parquet.binaryAsString=true| in SQL, you may read those |ENUM| values as plain UTF8 strings. Also, is there a good way to verify that the partitioning is being used? I tried explain like (where data is partitioned by type column) scala ev.filter(type = 'NON').explain == Physical Plan == Filter (type#4849 = NON) PhysicalRDD [...cols..], MapPartitionsRDD[103] at map at newParquet.scala:573 but that is the same even with non partitioned data. Do you mean how to verify whether partition pruning is effective? You should be able to see log lines like this: 15/07/22 11:14:35 INFO DataSourceStrategy: Selected 1 partitions out of 3, pruned 66.67% partitions. On Tue, Jul 21, 2015 at 4:35 PM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Parquet support for Thrift/Avro/ProtoBuf ENUM types are just added to the master branch. https://github.com/apache/spark/pull/7048 ENUM types are actually not in the Parquet format spec, that's why we didn't have it at the first place. Basically, ENUMs are always treated as UTF8 strings in Spark SQL now. Cheng On 7/22/15 3:41 AM, ankits wrote: Hi, I am using a custom build of spark 1.4 with the parquet dependency upgraded to 1.7. I have thrift data encoded with parquet that i want to partition by a column of type ENUM. Spark programming guide says partition discovery is only supported for string and numeric columns, so it seems partition discovery won't work out of the box here. Is there any workaround that will allow me to partition by ENUMs? Will hive partitioning help here? I am unfamiliar with Hive, and how it plays into parquet, thrift and spark so I would appreciate any pointers in the right direction. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Partition-parquet-data-by-ENUM-column-tp23939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: user threads in executors
I can post multiple items at a time. Data is being read from kafka and filtered after that its posted . Does foreachPartition load complete partition in memory or use an iterator of batch underhood? If compete batch is not loaded will using custim size of 100-200 request in one batch and post will help instead of whole partition ? On Wed, Jul 22, 2015 at 12:18 AM, Tathagata Das t...@databricks.com wrote: If you can post multiple items at a time, then use foreachPartition to post the whole partition in a single request. On Tue, Jul 21, 2015 at 9:35 AM, Richard Marscher rmarsc...@localytics.com wrote: You can certainly create threads in a map transformation. We do this to do concurrent DB lookups during one stage for example. I would recommend, however, that you switch to mapPartitions from map as this allows you to create a fixed size thread pool to share across items on a partition as opposed to spawning a future per record in the RDD for example. On Tue, Jul 21, 2015 at 4:11 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi Can I create user threads in executors. I have a streaming app where after processing I have a requirement to push events to external system . Each post request costs ~90-100 ms. To make post parllel, I can not use same thread because that is limited by no of cores available in system , can I useuser therads in spark App? I tried to create 2 thredas in a map tasks and it worked. Is there any upper limit on no of user threds in spark executor ? Is it a good idea to create user threads in spark map task? Thanks -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: RowId unique key for Dataframes
Would monotonicallyIncreasingId https://github.com/apache/spark/blob/d4c7a7a3642a74ad40093c96c4bf45a62a470605/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L637 work for you? Best, Burak On Tue, Jul 21, 2015 at 4:55 PM, Srikanth srikanth...@gmail.com wrote: Hello, I'm creating dataframes from three CSV files using spark-csv package. I want to add a unique ID for each row in dataframe. Not sure how withColumn() can be used to achieve this. I need a Long value not an UUID. One option I found was to create a RDD and use zipWithUniqueId. sqlContext.textFile(file). zipWithUniqueId(). map(case(d, i)=i.toString + delimiter + d). map(_.split(delimiter)). map(s=caseclass(...)) .toDF().select(field1, field2) Its a bit hacky. Is there an easier way to do this on dataframes and use spark-csv? Srikanth
??????Timestamp functions for sqlContext
Hi Rozen, you can get current time by call a java API and then get rowTimestamp by sql; val currentTimeStamp=System.currentTimeMillis()val rowTimestatm = sqlContext.sql(select rowTimestamp from tableName) and then you can wirte a function like this def diffDate(currentTimeStamp,rowTimeStamp ):INT={ here is something you need to do; } -- -- ??: Tal Rozen;t...@scaleka.com; : 2015??7??22??(??) 0:04 ??: useruser@spark.apache.org; : Timestamp functions for sqlContext Hi, I'm running a query with sql context where one of the fields is of type java.sql.Timestamp. I'd like to set a function similar to DATEDIFF in mysql, between the date given in each row, and now. So If I was able to use the same syntax as in mysql it would be: val date_diff_df = sqlContext.sql(select DATEDIFF(curdate(), rowTimestamp) date_diff from tableName) What are the relevant key words to replace curdate(), and DATEDIFF? Thanks
Re: How to share a Map among RDDS?
Either you have to do rdd.collect and then broadcast or you can do a join On 22 Jul 2015 07:54, Dan Dong dongda...@gmail.com wrote: Hi, All, I am trying to access a Map from RDDs that are on different compute nodes, but without success. The Map is like: val map1 = Map(aa-1,bb-2,cc-3,...) All RDDs will have to check against it to see if the key is in the Map or not, so seems I have to make the Map itself global, the problem is that if the Map is stored as RDDs and spread across the different nodes, each node will only see a piece of the Map and the info will not be complete to check against the Map( an then replace the key with the corresponding value) E,g: val matchs= Vecs.map(term=term.map{case (a,b)=(map1(a),b)}) But if the Map is not an RDD, how to share it like sc.broadcast(map1) Any idea about this? Thanks! Cheers, Dan
Re: Partition parquet data by ENUM column
Thanks a lot Cheng. So it seems even in spark 1.3 and 1.4, parquet ENUMs were treated as Strings in Spark SQL right? So does this mean partitioning for enums already works in previous versions too since they are just treated as strings? Also, is there a good way to verify that the partitioning is being used? I tried explain like (where data is partitioned by type column) scala ev.filter(type = 'NON').explain == Physical Plan == Filter (type#4849 = NON) PhysicalRDD [...cols..], MapPartitionsRDD[103] at map at newParquet.scala:573 but that is the same even with non partitioned data. On Tue, Jul 21, 2015 at 4:35 PM, Cheng Lian lian.cs@gmail.com wrote: Parquet support for Thrift/Avro/ProtoBuf ENUM types are just added to the master branch. https://github.com/apache/spark/pull/7048 ENUM types are actually not in the Parquet format spec, that's why we didn't have it at the first place. Basically, ENUMs are always treated as UTF8 strings in Spark SQL now. Cheng On 7/22/15 3:41 AM, ankits wrote: Hi, I am using a custom build of spark 1.4 with the parquet dependency upgraded to 1.7. I have thrift data encoded with parquet that i want to partition by a column of type ENUM. Spark programming guide says partition discovery is only supported for string and numeric columns, so it seems partition discovery won't work out of the box here. Is there any workaround that will allow me to partition by ENUMs? Will hive partitioning help here? I am unfamiliar with Hive, and how it plays into parquet, thrift and spark so I would appreciate any pointers in the right direction. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Partition-parquet-data-by-ENUM-column-tp23939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-hive parquet schema evolution
Hi Lian, Sorry I'm new to Spark so I did not express myself very clearly. I'm concerned about the situation when let's say I have a Parquet table some partitions and I add a new column A to parquet schema and write some data with the new schema to a new partition in the table. If i'm not mistaken, if I do a sqlContext.read.parquet(table_path).printSchema() it will print the correct schema with new column A. But if I do a 'describe table' from SparkSQLCLI I won't see the new column being added. I understand that this is because Hive doesn't support schema evolution. So what is the best way to support CLI queries in this situation? Do I need to manually alter the table everytime the underlying schema changes? Thanks On Tue, Jul 21, 2015 at 4:37 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Jerrick, What do you mean by schema evolution with Hive metastore tables? Hive doesn't take schema evolution into account. Could you please give a concrete use case? Are you trying to write Parquet data with extra columns into an existing metastore Parquet table? Cheng On 7/21/15 1:04 AM, Jerrick Hoang wrote: I'm new to Spark, any ideas would be much appreciated! Thanks On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I'm aware of the support for schema evolution via DataFrame API. Just wondering what would be the best way to go about dealing with schema evolution with Hive metastore tables. So, say I create a table via SparkSQL CLI, how would I deal with Parquet schema evolution? Thanks, J
Re: Convert Simple Kafka Consumer to standalone Spark JavaStream Consumer
From what I understand about your code, it is getting data from different partitions of a topic - get all data from partition 1, then from partition 2, etc. Though you have configured it to read from just one partition (topicCount has count = 1). So I am not sure what your intention is, read all partitions serially, or in parallel. If you want to start of Kafka + Spark Streaming, I strongly suggest reading the Kafka integration guide - https://spark.apache.org/docs/latest/streaming-kafka-integration.html and run the examples for the two ways - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala Since you understand the high level consumer idea, you may want to start with the first receiver-based approach, which uses HL consumer as well, and takes topicCounts. On Tue, Jul 21, 2015 at 8:23 AM, Hafsa Asif hafsa.a...@matchinguu.com wrote: Hi, I have a simple High level Kafka Consumer like : package matchinguu.kafka.consumer; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.*; public class SimpleHLConsumer { private final ConsumerConnector consumer; private final String topic; public SimpleHLConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); props.put(zookeeper.connect, zookeeper); props.put(group.id, groupId); props.put(zookeeper.session.timeout.ms, 500); props.put(zookeeper.sync.time.ms, 250); props.put(auto.commit.interval.ms, 1000); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); this.topic = topic; } public void testConsumer() { MapString, Integer topicCount = new HashMapString, Integer(); topicCount.put(topic, 1); MapString, Listlt;KafkaStreamlt;byte[], byte[] consumerStreams = consumer.createMessageStreams(topicCount); ListKafkaStreamlt;byte[], byte[] streams = consumerStreams.get(topic); for (final KafkaStream stream : streams) { ConsumerIteratorbyte[], byte[] it = stream.iterator(); while (it.hasNext()) { System.out.println(); System.out.println(Message from Single Topic: + new String(it.next().message())); } } if (consumer != null) { System.out.println(Shutdown Happens); consumer.shutdown(); } } public static void main(String[] args) { System.out.println(Consumer is now reading messages from producer); //String topic = args[0]; String topic = test; SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer(localhost:2181, testgroup, topic); simpleHLConsumer.testConsumer(); } } I want to get my messages through Spark Java Streaming with Kafka integration. Can anyone help me to reform this code so that I can get same output with Spark Kafka integration. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Convert-Simple-Kafka-Consumer-to-standalone-Spark-JavaStream-Consumer-tp23930.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark streaming disk hit
Is it fair to say that Storm stream processing is completely in memory, whereas spark streaming would take a disk hit because of how shuffle works? Does spark streaming try to avoid disk usage out of the box? -Abhishek- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkR sqlContext or sc not found in RStudio
I'm sorry, I have no idea why it is failing on your side.I have been using this for a while now and it works fine.All I can say is use version 1.4.0 but I don't think so it is going to make a big difference.This is the one which I use,a/b are my directories. Sys.setenv(SPARK_HOME=/a/b/spark-1.4.0) .libPaths(c(file.path(Sys.getenv(SPARK_HOME), R, lib), .libPaths())) library(SparkR) sc - sparkR.init(master=local) sqlContext - sparkRSQL.init(sc) Well,I'm going to ask another basic question,did you try some other version before from amplab github etc.. Can u remove the package remove.packages(SparkR) and run install-dev.sh from R folder of your spark_home and then try again to see if it works..Hopefully,it should work. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-sqlContext-or-sc-not-found-in-RStudio-tp23928p23938.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark thrift server supports timeout?
Hello everyone, Does spark thrift server support timeout? Is there a documentation I can reference for questions like these? I know it support cancels, but not sure about timeout. Thanks, Judy
Re: SparkR sqlContext or sc not found in RStudio
Yep,I saw that in your previous post and I thought it was a typing mistake that you did while posting,I never imagined that it was done on R studio.Glad it worked. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-sqlContext-or-sc-not-found-in-RStudio-tp23928p23941.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Add column to DF
Try instead: import org.apache.spark.sql.functions._ val determineDayPartID = udf((evntStDate: String, evntStHour: String) = { val stFormat = new java.text.SimpleDateFormat(yyMMdd) var stDateStr:String = evntStDate.substring(2,8) val stDate:Date = stFormat.parse(stDateStr) val stHour = evntStHour.substring(1,3).toDouble + 0.1 var bucket = Math.ceil(stHour/3.0).toInt val cal:Calendar = Calendar.getInstance cal.setTime(stDate) var dayOfWeek = cal.get(Calendar.DAY_OF_WEEK) if (dayOfWeek == 1) dayOfWeek = 8 if (dayOfWeek 6) bucket = bucket + 8 bucket }) input.withColumn(DayPartID, determineDayPartID (col(StartDate), col(EventStartHour)))
RE: Add column to DF
This is working! Thank you so much :) Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net From: mich...@databricks.com Date: Tue, 21 Jul 2015 12:08:04 -0700 Subject: Re: Add column to DF To: spanayo...@msn.com CC: user@spark.apache.org Try instead: import org.apache.spark.sql.functions._ val determineDayPartID = udf((evntStDate: String, evntStHour: String) = { val stFormat = new java.text.SimpleDateFormat(yyMMdd) var stDateStr:String = evntStDate.substring(2,8) val stDate:Date = stFormat.parse(stDateStr) val stHour = evntStHour.substring(1,3).toDouble + 0.1 var bucket = Math.ceil(stHour/3.0).toInt val cal:Calendar = Calendar.getInstance cal.setTime(stDate) var dayOfWeek = cal.get(Calendar.DAY_OF_WEEK) if (dayOfWeek == 1) dayOfWeek = 8 if (dayOfWeek 6) bucket = bucket + 8 bucket }) input.withColumn(DayPartID, determineDayPartID (col(StartDate), col(EventStartHour)))
Partition parquet data by ENUM column
Hi, I am using a custom build of spark 1.4 with the parquet dependency upgraded to 1.7. I have thrift data encoded with parquet that i want to partition by a column of type ENUM. Spark programming guide says partition discovery is only supported for string and numeric columns, so it seems partition discovery won't work out of the box here. Is there any workaround that will allow me to partition by ENUMs? Will hive partitioning help here? I am unfamiliar with Hive, and how it plays into parquet, thrift and spark so I would appreciate any pointers in the right direction. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Partition-parquet-data-by-ENUM-column-tp23939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Add column to DF
Hi, I am trying to ad a column to a data frame that I created based on a JSON file like this: val input = hiveCtx.jsonFile(wasb://n...@cmwhdinsightdatastore.blob.core.windows.net/json/*).toDF().persist(StorageLevel.MEMORY_AND_DISK) I have a function that is generating the values for the new column: def determineDayPartID(evntStDate: String, evntStHour: String) : Int = { val stFormat = new java.text.SimpleDateFormat(yyMMdd) var stDateStr:String = evntStDate.substring(2,8) val stDate:Date = stFormat.parse(stDateStr) val stHour = evntStHour.substring(1,3).toDouble + 0.1 var bucket = Math.ceil(stHour/3.0).toInt val cal:Calendar = Calendar.getInstance cal.setTime(stDate) var dayOfWeek = cal.get(Calendar.DAY_OF_WEEK) if (dayOfWeek == 1) dayOfWeek = 8 if (dayOfWeek 6) bucket = bucket + 8 return bucket } When I try: input.withColumn(DayPartID, callUDF(determineDayPartID, IntegerType, col(StartDate), col(EventStartHour))) I am getting the error: missing arguments for method determineDayPartID in object rating; follow this method with `_' if you want to treat it as a partially applied function Can you please help? Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
Re: user threads in executors
If you can post multiple items at a time, then use foreachPartition to post the whole partition in a single request. On Tue, Jul 21, 2015 at 9:35 AM, Richard Marscher rmarsc...@localytics.com wrote: You can certainly create threads in a map transformation. We do this to do concurrent DB lookups during one stage for example. I would recommend, however, that you switch to mapPartitions from map as this allows you to create a fixed size thread pool to share across items on a partition as opposed to spawning a future per record in the RDD for example. On Tue, Jul 21, 2015 at 4:11 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi Can I create user threads in executors. I have a streaming app where after processing I have a requirement to push events to external system . Each post request costs ~90-100 ms. To make post parllel, I can not use same thread because that is limited by no of cores available in system , can I useuser therads in spark App? I tried to create 2 thredas in a map tasks and it worked. Is there any upper limit on no of user threds in spark executor ? Is it a good idea to create user threads in spark map task? Thanks -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
How to share a Map among RDDS?
Hi, All, I am trying to access a Map from RDDs that are on different compute nodes, but without success. The Map is like: val map1 = Map(aa-1,bb-2,cc-3,...) All RDDs will have to check against it to see if the key is in the Map or not, so seems I have to make the Map itself global, the problem is that if the Map is stored as RDDs and spread across the different nodes, each node will only see a piece of the Map and the info will not be complete to check against the Map( an then replace the key with the corresponding value) E,g: val matchs= Vecs.map(term=term.map{case (a,b)=(map1(a),b)}) But if the Map is not an RDD, how to share it like sc.broadcast(map1) Any idea about this? Thanks! Cheers, Dan
Spark spark.shuffle.memoryFraction has no affect
Hi I am testing Spark on Amazon EMR using Python and the basic wordcount example shipped with Spark. After running the application, I realized that in Stage 0 reduceByKey(add), around 2.5GB shuffle is spilled to memory and 4GB shuffle is spilled to disk. Since in the wordcount example I am not caching or persisting any data, so I thought I can increase the performance of this application by giving more shuffle memoryFraction. So, in spark-defaults.conf, I added the following: spark.storage.memoryFraction0.2 spark.shuffle.memoryFraction0.6 However, I am still getting the same performance and the same amount of shuffle data is being spilled to disk and memory. I validated that Spark is reading these configurations using Spark UI/Environment and I can see my changes. Moreover, I tried setting spark.shuffle.spill to false and I got the performance I am looking for and all shuffle data was spilled to memory only. So, what am I getting wrong here and why not the extra shuffle memory fraction is not utilized? *My environment:* Amazon EMR with Spark 1.3.1 running using -x argument 1 Master node: m3.xlarge 3 Core nodes: m3.xlarge Application: wordcount.py Input: 10 .gz files 90MB each (~350MB unarchived) stored in S3 *Submit command:* /home/hadoop/spark/bin/spark-submit --deploy-mode client /mnt/wordcount.py s3n://input location *spark-defaults.conf:* spark.eventLog.enabled false spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 spark.driver.extraJavaOptions -Dspark.driver.log.level=INFO spark.masteryarn spark.executor.instances3 spark.executor.cores4 spark.executor.memory 9404M spark.default.parallelism 12 spark.eventLog.enabled true spark.eventLog.dir hdfs:///spark-logs/ spark.storage.memoryFraction0.2 spark.shuffle.memoryFraction0.6 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-spark-shuffle-memoryFraction-has-no-affect-tp23944.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: The auxService:spark_shuffle does not exist
Hi Andrew, Thanks for the advice. I didn't see the log in the NodeManager, so apparently, something was wrong with the yarn-site.xml configuration. After digging in more, I realize it was an user error. I'm sharing this with other people so others may know what mistake I have made. When I review the configurations, I notice that there was another property setting yarn.nodemanager.aux-services in mapred-site.xml. It turns out that mapred-site.xml will override the property yarn.nodemanager.aux-services in yarn-site.xml, because of this, spark_shuffle service was never enabled. :( err.. After deleting the redundant invalid properties in mapred-site.xml, it starts working. I see the following logs from the NodeManager. 2015-07-21 21:24:44,046 INFO org.apache.spark.network.yarn.YarnShuffleService: Initializing YARN shuffle service for Spark 2015-07-21 21:24:44,046 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Adding auxiliary service spark_shuffle, spark_shuffle 2015-07-21 21:24:44,264 INFO org.apache.spark.network.yarn.YarnShuffleService: Started YARN shuffle service for Spark on port 7337. Authentication is not enabled. Appreciate all and the pointers where to look at. Thanks, problem solved. Date: Tue, 21 Jul 2015 09:31:50 -0700 Subject: Re: The auxService:spark_shuffle does not exist From: and...@databricks.com To: alee...@hotmail.com CC: zjf...@gmail.com; rp...@njit.edu; user@spark.apache.org Hi Andrew, Based on your driver logs, it seems the issue is that the shuffle service is actually not running on the NodeManagers, but your application is trying to provide a spark_shuffle secret anyway. One way to verify whether the shuffle service is actually started is to look at the NodeManager logs for the following lines: Initializing YARN shuffle service for Spark Started YARN shuffle service for Spark on port X These should be logged under the INFO level. Also, could you verify whether all the executors have this problem, or just a subset? If even one of the NM doesn't have the shuffle service, you'll see the stack trace that you ran into. It would be good to confirm whether the yarn-site.xml change is actually reflected on all NMs if the log statements above are missing. Let me know if you can get it working. I've run the shuffle service myself on the master branch (which will become Spark 1.5.0) recently following the instructions and have not encountered any problems. -Andrew
Re: spark streaming disk hit
Thanks TD - appreciate the response ! On Jul 21, 2015, at 1:54 PM, Tathagata Das t...@databricks.com wrote: Most shuffle files are really kept around in the OS's buffer/disk cache, so it is still pretty much in memory. If you are concerned about performance, you have to do a holistic comparison for end-to-end performance. You could take a look at this. https://spark-summit.org/2015/events/towards-benchmarking-modern-distributed-streaming-systems/ On Tue, Jul 21, 2015 at 11:57 AM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: Is it fair to say that Storm stream processing is completely in memory, whereas spark streaming would take a disk hit because of how shuffle works? Does spark streaming try to avoid disk usage out of the box? -Abhishek- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Class Loading Issue - Spark Assembly and Application Provided
Hi All , I am having a class loading issue as Spark Assembly is using google guice internally and one of Jar i am using uses sisu-guice-3.1.0-no_aop.jar , How do i load my class first so that it doesn't result in error and tell spark to load its assembly later on Ashish
Re: spark streaming disk hit
Most shuffle files are really kept around in the OS's buffer/disk cache, so it is still pretty much in memory. If you are concerned about performance, you have to do a holistic comparison for end-to-end performance. You could take a look at this. https://spark-summit.org/2015/events/towards-benchmarking-modern-distributed-streaming-systems/ On Tue, Jul 21, 2015 at 11:57 AM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: Is it fair to say that Storm stream processing is completely in memory, whereas spark streaming would take a disk hit because of how shuffle works? Does spark streaming try to avoid disk usage out of the box? -Abhishek- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL Table Caching
A few questions about caching a table in Spark SQL. 1) Is there any difference between caching the dataframe and the table? df.cache() vs sqlContext.cacheTable(tableName) 2) Do you need to warm up the cache before seeing the performance benefits? Is the cache LRU? Do you need to run some queries on the table before it is cached in memory? 3) Is caching the table much faster than .saveAsTable? I am only seeing a 10 %- 20% performance increase.
Re: Classifier for Big Data Mining
depends on your data and I guess the time/performance goals you have for both training/prediction, but for a quick answer : yes :) 2015-07-21 11:22 GMT+02:00 Chintan Bhatt chintanbhatt...@charusat.ac.in: Which classifier can be useful for mining massive datasets in spark? Decision Tree can be good choice as per scalability? -- CHINTAN BHATT http://in.linkedin.com/pub/chintan-bhatt/22/b31/336/ Assistant Professor, U P U Patel Department of Computer Engineering, Chandubhai S. Patel Institute of Technology, Charotar University of Science And Technology (CHARUSAT), Changa-388421, Gujarat, INDIA. http://www.charusat.ac.in *Personal Website*: https://sites.google.com/a/ecchanga.ac.in/chintan/
Which memory fraction is Spark using to compute RDDs that are not going to be persisted
I am new to Spark and I understand that Spark divides the executor memory into the following fractions: *RDD Storage:* Which Spark uses to store persisted RDDs using .persist() or .cache() and can be defined by setting spark.storage.memoryFraction (default 0.6) *Shuffle and aggregation buffers:* Which Spark uses to store shuffle outputs. It can defined using spark.shuffle.memoryFraction. If shuffle output exceeds this fraction, then Spark will spill data to disk (default 0.2) *User code:* Spark uses this fraction to execute arbitrary user code (default 0.2) I am not mentioning the storage and shuffle safety fractions for simplicity. My question is, which memory fraction is Spark using to compute and transform RDDs that are not going to be persisted? For example: lines = sc.textFile(i am a big file.txt) count = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add) count.saveAsTextFile(output) Here Spark will not load the whole file at once and will partition the input file and do all these transformations per partition in a single stage. However, which memory fraction Spark will use to load the partitioned lines, compute flatMap() and map()? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Which-memory-fraction-is-Spark-using-to-compute-RDDs-that-are-not-going-to-be-persisted-tp23942.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
NullPointerException inside RDD when calling sc.textFile
I have a number of CSV files and need to combine them into a RDD by part of their filenames. For example, for the below files $ ls 20140101_1.csv 20140101_3.csv 20140201_2.csv 20140301_1.csv 20140301_3.csv 20140101_2.csv 20140201_1.csv 20140201_3.csv I need to combine files with names 20140101*.csv into a RDD to work on it and so on. I am using sc.wholeTextFiles to read the entire directory and then grouping the filenames by their patters to form a string of filenames. I am then passing the string to sc.textFile to open the files as a single RDD. This is the code I have - val files = sc.wholeTextFiles(*.csv) val indexed_files = files.map(a = (a._1.split(_)(0),a._1)) val data = indexed_files.groupByKey data.map { a = var name = a._2.mkString(,) (a._1, name) } data.foreach { a = var file = sc.textFile(a._2) println(file.count) } And I get SparkException - NullPointerException when I try to call textFile. The error stack refers to an Iterator inside the RDD. I am not able to understand the error - 15/07/21 15:37:37 INFO TaskSchedulerImpl: Removed TaskSet 65.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 65.0 failed 4 times, most recent failure: Lost task 1.3 in stage 65.0 (TID 115, 10.132.8.10): java.lang.NullPointerException at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:33) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:32) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) However, when I do sc.textFile(data.first._2).count in the spark shell, I am able to form the RDD and able to retrieve the count. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-inside-RDD-when-calling-sc-textFile-tp23943.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Classifier for Big Data Mining
How to load dataset in apache spark? Can I know sources of massive datasets? On Wed, Jul 22, 2015 at 4:50 AM, Ron Gonzalez zlgonza...@yahoo.com.invalid wrote: I'd use Random Forest. It will give you better generalizability. There are also a number of things you can do with RF that allows to train on samples of the massive data set and then just average over the resulting models... Thanks, Ron On 07/21/2015 02:17 PM, Olivier Girardot wrote: depends on your data and I guess the time/performance goals you have for both training/prediction, but for a quick answer : yes :) 2015-07-21 11:22 GMT+02:00 Chintan Bhatt chintanbhatt...@charusat.ac.in: Which classifier can be useful for mining massive datasets in spark? Decision Tree can be good choice as per scalability? -- CHINTAN BHATT http://in.linkedin.com/pub/chintan-bhatt/22/b31/336/ Assistant Professor, U P U Patel Department of Computer Engineering, Chandubhai S. Patel Institute of Technology, Charotar University of Science And Technology (CHARUSAT), Changa-388421, Gujarat, INDIA. http://www.charusat.ac.in *Personal Website*: https://sites.google.com/a/ecchanga.ac.in/chintan/ -- CHINTAN BHATT http://in.linkedin.com/pub/chintan-bhatt/22/b31/336/ Assistant Professor, U P U Patel Department of Computer Engineering, Chandubhai S. Patel Institute of Technology, Charotar University of Science And Technology (CHARUSAT), Changa-388421, Gujarat, INDIA. http://www.charusat.ac.in *Personal Website*: https://sites.google.com/a/ecchanga.ac.in/chintan/
Re: How to restart Twitter spark stream
Hi Akhil and Jorn, I tried as you suggested to create some simple scenario, but I have an error on rdd.join(newRDD): value join is not a member of org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like: val stream = TwitterUtils.createStream(ssc, auth) val filteredStream= stream.transform(rdd ={ val samplehashtags=Array(music,film) val newRDD= samplehashtags.map { x = (x,1) } rdd.join(newRDD) }) Did I miss something here? Thanks, Zoran On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Thanks for explanation. If I understand this correctly, in this approach I would actually stream everything from Twitter, and perform filtering in my application using Spark. Isn't this too much overhead if my application is interested in listening for couple of hundreds or thousands hashtags? On one side, this will be better approach since I will not have the problem to open new streams if number of hashtags go over 400 which is the Twitter limit for User stream filtering, but on the other side I'm concern about how much it will affect application performance if I stream everything that is posted on Twitter and filter it locally. It would be great if somebody with experience on this could comment on these concerns. Thanks, Zoran On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult. However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by join it with a rdd loading the newest hash tags from disk in a regular interval. Thanks, Zoran On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote: Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41) at org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46) at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) at
Question on Spark SQL for a directory
Hi, Question on using spark sql. Can someone give an example for creating table from a directory containing parquet files in HDFS instead of an actual parquet file? Thanks, Ron On 07/21/2015 01:59 PM, Brandon White wrote: A few questions about caching a table in Spark SQL. 1) Is there any difference between caching the dataframe and the table? df.cache() vs sqlContext.cacheTable(tableName) 2) Do you need to warm up the cache before seeing the performance benefits? Is the cache LRU? Do you need to run some queries on the table before it is cached in memory? 3) Is caching the table much faster than .saveAsTable? I am only seeing a 10 %- 20% performance increase. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Classifier for Big Data Mining
I'd use Random Forest. It will give you better generalizability. There are also a number of things you can do with RF that allows to train on samples of the massive data set and then just average over the resulting models... Thanks, Ron On 07/21/2015 02:17 PM, Olivier Girardot wrote: depends on your data and I guess the time/performance goals you have for both training/prediction, but for a quick answer : yes :) 2015-07-21 11:22 GMT+02:00 Chintan Bhatt chintanbhatt...@charusat.ac.in mailto:chintanbhatt...@charusat.ac.in: Which classifier can be useful for mining massive datasets in spark? Decision Tree can be good choice as per scalability? -- CHINTAN BHATT http://in.linkedin.com/pub/chintan-bhatt/22/b31/336/ Assistant Professor, U P U Patel Department of Computer Engineering, Chandubhai S. Patel Institute of Technology, Charotar University of Science And Technology (CHARUSAT), Changa-388421, Gujarat, INDIA. http://www.charusat.ac.in http://www.charusat.ac.in/ _Personal Website_: https://sites.google.com/a/ecchanga.ac.in/chintan/
Re: Partition parquet data by ENUM column
Parquet support for Thrift/Avro/ProtoBuf ENUM types are just added to the master branch. https://github.com/apache/spark/pull/7048 ENUM types are actually not in the Parquet format spec, that's why we didn't have it at the first place. Basically, ENUMs are always treated as UTF8 strings in Spark SQL now. Cheng On 7/22/15 3:41 AM, ankits wrote: Hi, I am using a custom build of spark 1.4 with the parquet dependency upgraded to 1.7. I have thrift data encoded with parquet that i want to partition by a column of type ENUM. Spark programming guide says partition discovery is only supported for string and numeric columns, so it seems partition discovery won't work out of the box here. Is there any workaround that will allow me to partition by ENUMs? Will hive partitioning help here? I am unfamiliar with Hive, and how it plays into parquet, thrift and spark so I would appreciate any pointers in the right direction. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Partition-parquet-data-by-ENUM-column-tp23939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-hive parquet schema evolution
Hey Jerrick, What do you mean by schema evolution with Hive metastore tables? Hive doesn't take schema evolution into account. Could you please give a concrete use case? Are you trying to write Parquet data with extra columns into an existing metastore Parquet table? Cheng On 7/21/15 1:04 AM, Jerrick Hoang wrote: I'm new to Spark, any ideas would be much appreciated! Thanks On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang jerrickho...@gmail.com mailto:jerrickho...@gmail.com wrote: Hi all, I'm aware of the support for schema evolution via DataFrame API. Just wondering what would be the best way to go about dealing with schema evolution with Hive metastore tables. So, say I create a table via SparkSQL CLI, how would I deal with Parquet schema evolution? Thanks, J
Re: Question on Spark SQL for a directory
https://spark.apache.org/docs/latest/sql-programming-guide.html#loading-data-programmatically On Tue, Jul 21, 2015 at 4:06 PM, Ron Gonzalez zlgonza...@yahoo.com.invalid wrote: Hi, Question on using spark sql. Can someone give an example for creating table from a directory containing parquet files in HDFS instead of an actual parquet file? Thanks, Ron On 07/21/2015 01:59 PM, Brandon White wrote: A few questions about caching a table in Spark SQL. 1) Is there any difference between caching the dataframe and the table? df.cache() vs sqlContext.cacheTable(tableName) 2) Do you need to warm up the cache before seeing the performance benefits? Is the cache LRU? Do you need to run some queries on the table before it is cached in memory? 3) Is caching the table much faster than .saveAsTable? I am only seeing a 10 %- 20% performance increase. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: dataframes sql order by not total ordering
Thanks, that works a lot better ;) scala val results =sqlContext.sql(select movies.title, movierates.maxr, movierates.minr, movierates.cntu from(SELECT ratings.product, max(ratings.rating) as maxr, min(ratings.rating) as minr,count(distinct user) as cntu FROM ratings group by ratings.product ) movierates join movies on movierates.product=movies.movieId order by movierates.cntu desc ) results: org.apache.spark.sql.DataFrame = [title: string, maxr: double, minr: double, cntu: bigint] scala results.show titlemaxr minr cntu American Beauty (... 5.0 1.0 3428 Star Wars: Episod... 5.0 1.0 2991 Star Wars: Episod... 5.0 1.0 2990 Star Wars: Episod... 5.0 1.0 2883 Jurassic Park (1993) 5.0 1.0 2672 Saving Private Ry... 5.0 1.0 2653 On Mon, Jul 20, 2015 at 6:09 PM, Michael Armbrust mich...@databricks.com wrote: An ORDER BY needs to be on the outermost query otherwise subsequent operations (such as the join) could reorder the tuples. On Mon, Jul 20, 2015 at 9:25 AM, Carol McDonald cmcdon...@maprtech.com wrote: the following query on the Movielens dataset , is sorting by the count of ratings for a movie. It looks like the results are ordered by partition ? scala val results =sqlContext.sql(select movies.title, movierates.maxr, movierates.minr, movierates.cntu from(SELECT ratings.product, max(ratings.rating) as maxr, min(ratings.rating) as minr,count(distinct user) as cntu FROM ratings group by ratings.product order by cntu desc) movierates join movies on movierates.product=movies.movieId ) scala results.take(30).foreach(println) [Right Stuff, The (1983),5.0,1.0,750] [Lost in Space (1998),5.0,1.0,667] [Dumb Dumber (1994),5.0,1.0,660] [Patch Adams (1998),5.0,1.0,474] [Carlito's Way (1993),5.0,1.0,369] [Rounders (1998),5.0,1.0,345] [Bedknobs and Broomsticks (1971),5.0,1.0,319] [Beverly Hills Ninja (1997),5.0,1.0,232] [Saving Grace (2000),5.0,1.0,186] [Dangerous Minds (1995),5.0,1.0,141] [Death Wish II (1982),5.0,1.0,85] [All Dogs Go to Heaven 2 (1996),5.0,1.0,75] [Repossessed (1990),4.0,1.0,53] [Assignment, The (1997),5.0,1.0,49] [$1,000,000 Duck (1971),5.0,1.0,37] [Stonewall (1995),5.0,1.0,20] [Dog of Flanders, A (1999),5.0,1.0,8] [Frogs for Snakes (1998),3.0,1.0,5] [It's in the Water (1998),3.0,2.0,3] [Twelve Monkeys (1995),5.0,1.0,1511] [Ransom (1996),5.0,1.0,564] [Alice in Wonderland (1951),5.0,1.0,525] [City Slickers II: The Legend of Curly's Gold (1994),5.0,1.0,392] [Eat Drink Man Woman (1994),5.0,1.0,346] [Cube (1997),5.0,1.0,233] [Omega Man, The (1971),5.0,1.0,224] [Stepmom (1998),5.0,1.0,146] [Metro (1997),5.0,1.0,100] [Death Wish 3 (1985),5.0,1.0,72] [Stalker (1979),5.0,1.0,52]
Re: use S3-Compatible Storage with spark
You can add the jar in the classpath, and you can set the property like: sc.hadoopConfiguration.set(fs.s3a.endpoint,storage.sigmoid.com) Thanks Best Regards On Mon, Jul 20, 2015 at 9:41 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Thanks, that is what I was looking for... Any Idea where I have to store and reference the corresponding hadoop-aws-2.6.0.jar ?: java.io.IOException: No FileSystem for scheme: s3n 2015-07-20 8:33 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Not in the uri, but in the hadoop configuration you can specify it. property namefs.s3a.endpoint/name descriptionAWS S3 endpoint to connect to. An up-to-date list is provided in the AWS Documentation: regions and endpoints. Without this property, the standard region (s3.amazonaws.com) is assumed. /description /property Thanks Best Regards On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: I want to use pithos, were do I can specify that endpoint, is it possible in the url ? 2015-07-19 17:22 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Could you name the Storage service that you are using? Most of them provides a S3 like RestAPI endpoint for you to hit. Thanks Best Regards On Fri, Jul 17, 2015 at 2:06 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to amazon, is there a way I can specify the host somewhere ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming 1.3 issues
I'd suggest you upgrading to 1.4 as it has better metrices and UI. Thanks Best Regards On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora shushantaror...@gmail.com wrote: Is coalesce not applicable to kafkaStream ? How to do coalesce on kafkadirectstream its not there in api ? Shall calling repartition on directstream with number of executors as numpartitions will imrove perfromance ? Does in 1.3 tasks get launched for partitions which are empty? Does driver makes call for getting offsets of each partition separately or in single call it gets all partitions new offsets ? I mean will reducing no of partitions oin kafka help improving the performance? On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi 1.I am using spark streaming 1.3 for reading from a kafka queue and pushing events to external source. I passed in my job 20 executors but it is showing only 6 in executor tab ? When I used highlevel streaming 1.2 - its showing 20 executors. My cluster is 10 node yarn cluster with each node has 8 cores. I am calling the script as : spark-submit --class classname --num-executors 10 --executor-cores 2 --master yarn-client jarfile 2. On Streaming UI Started at: Mon Jul 20 11:02:10 GMT+00:00 2015 Time since start: 13 minutes 28 seconds Network receivers: 0 Batch interval: 1 second Processed batches: 807 Waiting batches: 0 Received records: 0 Processed records: 0 Received records and processed records are always 0 . And Speed of processing is slow compare to highlevel api. I am procesing the stream using mapPartition. When I used directKafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[], Void() { @Override public Void call(JavaPairRDDbyte[], byte[] rdd) throws Exception { // TODO Auto-generated method stub OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges(); } } It throws an exception java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges Thanks Shushant
Re: Apache Spark : spark.eventLog.dir on Windows Environment
Do you have HADOOP_HOME, HADOOP_CONF_DIR and hadoop's winutils.exe in the environment? Thanks Best Regards On Mon, Jul 20, 2015 at 5:45 PM, nitinkalra2000 nitinkalra2...@gmail.com wrote: Hi All, I am working on Spark 1.4 on windows environment. I have to set eventLog directory so that I can reopen the Spark UI after application has finished. But I am not able to set eventLog.dir, It gives an error on Windows environment. Configuation is : entry key=spark.eventLog.enabled value=true / entry key=spark.eventLog.dir value=file:///c:/sparklogs / Exception I get : java.io.IOException: Cannot run program cygpath: CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.hadoop.util.Shell.runCommand(Shell.java:206) I have also tried installing Cygwin but still the error doesn't go. Can anybody give any advice on it? I have posted the same question on Stackoverflow as well : http://stackoverflow.com/questions/31468716/apache-spark-spark-eventlog-dir-on-windows-environment Thanks Nitin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-spark-eventLog-dir-on-Windows-Environment-tp23913.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: updateStateByKey schedule time
I also ran into a similar use case. Is this possible? On 15 July 2015 at 18:12, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I want to implement a time-out mechanism in de updateStateByKey(…) routine. But is there a way the retrieve the time of the start of the batch corresponding to the call to my updateStateByKey routines? Suppose the streaming has build up some delay then a System.currentTimeMillis() will not be the time of the time the batch was scheduled. I want to retrieve the job/task schedule time of the batch for which my updateStateByKey(..) routine is called. Is this possible? With kind regards, Michel Hubert
RE: standalone to connect mysql
That works! Thanks. Can I ask you one further question? How did spark sql support insertion? That is say, if I did: sqlContext.sql(insert into newStu values (“10”,”a”,1) the error is: failure: ``table'' expected but identifier newStu found insert into newStu values ('10', aa, 1) but if I did: sqlContext.sql(sinsert into Table newStu select * from otherStu) that works. Is there any document addressing that? Best regards, Jack From: Terry Hole [mailto:hujie.ea...@gmail.com] Sent: Tuesday, 21 July 2015 4:17 PM To: Jack Yang; user@spark.apache.org Subject: Re: standalone to connect mysql Maybe you can try: spark-submit --class sparkwithscala.SqlApp --jars /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar Thanks! -Terry Hi there, I would like to use spark to access the data in mysql. So firstly I tried to run the program using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master local[4] /home/myjar.jar that returns me the correct results. Then I tried the standalone version using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar (the mysql-connector-java-5.1.34.jar i have them on all worker nodes.) and the error is: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.157.129): java.sql.SQLException: No suitable driver found for jdbc:mysql://hadoop1:3306/sparkMysqlDB?user=rootpassword=root I also found the similar problem before in https://jira.talendforge.org/browse/TBD-2244. Is this a bug to be fixed later? Or do I miss anything? Best regards, Jack
user threads in executors
Hi Can I create user threads in executors. I have a streaming app where after processing I have a requirement to push events to external system . Each post request costs ~90-100 ms. To make post parllel, I can not use same thread because that is limited by no of cores available in system , can I useuser therads in spark App? I tried to create 2 thredas in a map tasks and it worked. Is there any upper limit on no of user threds in spark executor ? Is it a good idea to create user threads in spark map task? Thanks
Re: Apache Spark : spark.eventLog.dir on Windows Environment
Hi Akhil, I don't have HADOOP_HOME or HADOOP_CONF_DIR and even winutils.exe ? What's the configuration required for this ? From where can I get winutils.exe ? Thanks and Regards, Nitin Kalra On Tue, Jul 21, 2015 at 1:30 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Do you have HADOOP_HOME, HADOOP_CONF_DIR and hadoop's winutils.exe in the environment? Thanks Best Regards On Mon, Jul 20, 2015 at 5:45 PM, nitinkalra2000 nitinkalra2...@gmail.com wrote: Hi All, I am working on Spark 1.4 on windows environment. I have to set eventLog directory so that I can reopen the Spark UI after application has finished. But I am not able to set eventLog.dir, It gives an error on Windows environment. Configuation is : entry key=spark.eventLog.enabled value=true / entry key=spark.eventLog.dir value=file:///c:/sparklogs / Exception I get : java.io.IOException: Cannot run program cygpath: CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.hadoop.util.Shell.runCommand(Shell.java:206) I have also tried installing Cygwin but still the error doesn't go. Can anybody give any advice on it? I have posted the same question on Stackoverflow as well : http://stackoverflow.com/questions/31468716/apache-spark-spark-eventlog-dir-on-windows-environment Thanks Nitin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-spark-eventLog-dir-on-Windows-Environment-tp23913.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there more information about spark shuffer-service
To my knowledge, there is no HA for External Shuffle Service. Cheers On Jul 21, 2015, at 2:16 AM, JoneZhang joyoungzh...@gmail.com wrote: There is a saying If the service is enabled, Spark executors will fetch shuffle files from the service instead of from each other. in the wiki https://spark.apache.org/docs/1.3.0/job-scheduling.html#graceful-decommission-of-executors https://spark.apache.org/docs/1.3.0/job-scheduling.html#graceful-decommission-of-executors Is there more information about shuffer-service. For example. How to deal with the service shut down, does any redundancy exists? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-more-information-about-spark-shuffer-service-tp23925.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: use S3-Compatible Storage with spark
Which version do you have ? - I tried with spark 1.4.1 for hdp 2.6, but here I had an issue that the aws-module is not there somehow: java.io.IOException: No FileSystem for scheme: s3n the same for s3a : java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found - On Spark 1.4.1 for hdp 2.4 , the module is there, and works out of the box for S3n (but for the endpoint) But I have java.io.IOException: No FileSystem for scheme: s3a :-| 2015-07-21 11:09 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Did you try with s3a? It seems its more like an issue with hadoop. Thanks Best Regards On Tue, Jul 21, 2015 at 2:31 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: It seems to work for the credentials , but the endpoint is ignored.. : I've changed it to sc.hadoopConfiguration.set(fs.s3n.endpoint,test.com) And I continue to get my data from amazon, how could it be ? (I also use s3n in my text url) 2015-07-21 9:30 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: You can add the jar in the classpath, and you can set the property like: sc.hadoopConfiguration.set(fs.s3a.endpoint,storage.sigmoid.com) Thanks Best Regards On Mon, Jul 20, 2015 at 9:41 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Thanks, that is what I was looking for... Any Idea where I have to store and reference the corresponding hadoop-aws-2.6.0.jar ?: java.io.IOException: No FileSystem for scheme: s3n 2015-07-20 8:33 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Not in the uri, but in the hadoop configuration you can specify it. property namefs.s3a.endpoint/name descriptionAWS S3 endpoint to connect to. An up-to-date list is provided in the AWS Documentation: regions and endpoints. Without this property, the standard region (s3.amazonaws.com) is assumed. /description /property Thanks Best Regards On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: I want to use pithos, were do I can specify that endpoint, is it possible in the url ? 2015-07-19 17:22 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Could you name the Storage service that you are using? Most of them provides a S3 like RestAPI endpoint for you to hit. Thanks Best Regards On Fri, Jul 17, 2015 at 2:06 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to amazon, is there a way I can specify the host somewhere ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
LinearRegressionWithSGD Outputs NaN
Hi , I am trying to use LinearRegressionWithSGD on Million Song Data Set and my model returns NaN's as weights and 0.0 as the intercept. What might be the issue for the error ? I am using Spark 1.40 in standalone mode. Below is my model: val numIterations = 100 val stepSize = 1.0 val regParam = 0.01 val regType = L2 val algorithm = new LinearRegressionWithSGD() algorithm.optimizer.setNumIterations(numIterations).setStepSize(stepSize).setRegParam(regParam) val model = algorithm.run(parsedTrainData) Regards, Naveen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Would driver shutdown cause app dead?
Hi all:I am a bit confuse about the work of driver.In our productin enviroment, we have a spark streaming app running in standone mode. what we concern is that if the driver shutdown accidently(host shutdown or whatever). would the app running normally? Any explanation would be appreciated!! Cheers
Re: standalone to connect mysql
Jack, You can refer the hive sql syntax if you use HiveContext: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML Thanks! -Terry That works! Thanks. Can I ask you one further question? How did spark sql support insertion? That is say, if I did: sqlContext.sql(insert into newStu values (“10”,”a”,1) the error is: failure: ``table'' expected but identifier newStu found insert into newStu values ('10', aa, 1) but if I did: sqlContext.sql(sinsert into Table newStu select * from otherStu) that works. Is there any document addressing that? Best regards, Jack *From:* Terry Hole [mailto:hujie.ea...@gmail.com] *Sent:* Tuesday, 21 July 2015 4:17 PM *To:* Jack Yang; user@spark.apache.org *Subject:* Re: standalone to connect mysql Maybe you can try: spark-submit --class sparkwithscala.SqlApp --jars /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar Thanks! -Terry Hi there, I would like to use spark to access the data in mysql. So firstly I tried to run the program using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master local[4] /home/myjar.jar that returns me the correct results. Then I tried the standalone version using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar (the mysql-connector-java-5.1.34.jar i have them on all worker nodes.) and the error is: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.157.129): java.sql.SQLException: No suitable driver found for jdbc:mysql://hadoop1:3306/sparkMysqlDB?user=rootpassword=root I also found the similar problem before in https://jira.talendforge.org/browse/TBD-2244. Is this a bug to be fixed later? Or do I miss anything? Best regards, Jack
Running driver app as a daemon
Hi, I am trying to start a driver app as a daemon using Linux' start-stop-daemon script (I need console detaching, unbuffered STDOUT/STDERR to logfile and start/stop using a PID file). I am doing this like this (which works great for the other apps we have) /sbin/start-stop-daemon -c $USER --background --exec /bin/bash --pidfile ${PIDFILE} --start \ -- -c stdbuf -o0 /opt/spark-1.4.0-bin-custom-spark/bin/spark-submit --master spark://sparkhost-1:7077 \ --class boot.Boot /myjar.jar $OPTS 21 | /usr/bin/logger -p local4.notice -t mytag Can anyone see an issue why this might bot work? Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
writing/reading multiple Parquet files: Failed to merge incompatible data types StringType and StructType
Hi everyone, I have pretty challenging problem with reading/writing multiple parquet files with streaming, but let me introduce my data flow: I have a lot of json events streaming to my platform. All of them have the same structure, but fields are mostly optional. Some of the fields are arrays with structs inside. These arrays can be empty, but sometimes they contain the data (structs). Now I'm using Spark SQL Streaming to: 0. Stream data from Kafka val stream = KafkaUtils.createDirectStream ... 1. read json data to json dataframe: stream.foreachRDD( rdd = { val dataRdd : RDD[String] = myTransform(rdd) val jsonDf = sql.read.json(dataRdd) 2. write jsonDf to Parquet files: if (firstRun) { jsonRdd.write.parquet(parquet-events) firstRun = false } else { // the table has to exist to be able to append data. jsonRdd.write.mode(SaveMode.Append).parquet(parquet-events) } }) All the writing goes fine. It produces multiple files, each for one batch of data. But the problem is on reading the data: scala val sqlContext = new org.apache.spark.sql.SQLContext(sc) scala val events = sqlContext.read.parquet(parquet-events) org.apache.spark.SparkException: Failed to merge incompatible schemas StructType... Caused by: org.apache.spark.SparkException: Failed to merge incompatible data types StringType and StructType(StructField(key,StringType,true), StructField(value,StringType,true)) Indeed the printed schemas contain mismatched types of few fields, e.g.: StructField(details,ArrayType(StringType,true),true) vs StructField(details,ArrayType(StructType(StructField(key,StringType,true), StructField(value,StringType,tru e)),true),true) It seems that bad thing happened in read.json: it recognized my array fields differently: when array is empty as containing Strings; when filled with data as containing structs. The code of json/InferSchema indeed suggests that: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala#L127Where canonicalizeType method replaces NullType with StringType in my empty arrays. This is a serious problem for someone trying to stream data from json to parquet tables . Does anyone have ideas how to handle this problem? My ideas are (some non-exclusive): 1. Have schema perfectly defined on my data. This is a last resort as I wanted to create schema-less solution. 2. Write my own schema inference, that removes empty arrays from schema. Then pass schema directly to read method. I could even use modify InferSchema class from spark source, but it is private unfortunately... So I need to copy paste it. 3. Submit a bug to Spark about it. Do you also think it is a bug? It's a blocker for me currently, any help will be appreciated! Cheers, Krzysztof
Re: What else is need to setup native support of BLAS/LAPACK with Spark?
Great, and that file exists on HDFS and is world readable? just double-checking. What classpath is this -- your driver or executor? this is the driver, no? I assume so just because it looks like it references the assembly you built locally and from which you're launching the driver. I think we're concerned with the executors and what they have on the classpath. I suspect there is still a problem somewhere in there. On Mon, Jul 20, 2015 at 4:59 PM, Arun Ahuja aahuj...@gmail.com wrote: Cool, I tried that as well, and doesn't seem different: spark.yarn.jar seems set [image: Inline image 1] This actually doesn't change the classpath, not sure if it should: [image: Inline image 3] But same netlib warning. Thanks for the help! - Arun On Fri, Jul 17, 2015 at 3:18 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Can you try setting the spark.yarn.jar property to make sure it points to the jar you're thinking of? -Sandy On Fri, Jul 17, 2015 at 11:32 AM, Arun Ahuja aahuj...@gmail.com wrote: Yes, it's a YARN cluster and using spark-submit to run. I have SPARK_HOME set to the directory above and using the spark-submit script from there. bin/spark-submit --master yarn-client --executor-memory 10g --driver-memory 8g --num-executors 400 --executor-cores 1 --class org.hammerlab.guacamole.Guacamole --conf spark.default.parallelism=4000 --conf spark.storage.memoryFraction=0.15 libgfortran.so.3 is also there ls /usr/lib64/libgfortran.so.3 /usr/lib64/libgfortran.so.3 These are jniloader files in the jar jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep jniloader META-INF/maven/com.github.fommil/jniloader/ META-INF/maven/com.github.fommil/jniloader/pom.xml META-INF/maven/com.github.fommil/jniloader/pom.properties Thanks, Arun On Fri, Jul 17, 2015 at 1:30 PM, Sean Owen so...@cloudera.com wrote: Make sure /usr/lib64 contains libgfortran.so.3; that's really the issue. I'm pretty sure the answer is 'yes', but, make sure the assembly has jniloader too. I don't see why it wouldn't, but, that's needed. What is your env like -- local, standalone, YARN? how are you running? Just want to make sure you are using this assembly across your cluster. On Fri, Jul 17, 2015 at 6:26 PM, Arun Ahuja aahuj...@gmail.com wrote: Hi Sean, Thanks for the reply! I did double-check that the jar is one I think I am running: [image: Inline image 2] jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native com/github/fommil/netlib/NativeRefARPACK.class com/github/fommil/netlib/NativeRefBLAS.class com/github/fommil/netlib/NativeRefLAPACK.class com/github/fommil/netlib/NativeSystemARPACK.class com/github/fommil/netlib/NativeSystemBLAS.class com/github/fommil/netlib/NativeSystemLAPACK.class Also, I checked the gfortran version on the cluster nodes and it is available and is 5.1 $ gfortran --version GNU Fortran (GCC) 5.1.0 Copyright (C) 2015 Free Software Foundation, Inc. and still see: 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Does anything need to be adjusted in my application POM? Thanks, Arun On Thu, Jul 16, 2015 at 5:26 PM, Sean Owen so...@cloudera.com wrote: Yes, that's most of the work, just getting the native libs into the assembly. netlib can find them from there even if you don't have BLAS libs on your OS, since it includes a reference implementation as a fallback. One common reason it won't load is not having libgfortran installed on your OSes though. It has to be 4.6+ too. That can't be shipped even in netlib and has to exist on your hosts. The other thing I'd double-check is whether you are really using this assembly you built for your job -- like, it's the actually the assembly the executors are using. On Tue, Jul 7, 2015 at 8:47 PM, Arun Ahuja aahuj...@gmail.com wrote: Is there more documentation on what is needed to setup BLAS/LAPACK native suport with Spark. I’ve built spark with the -Pnetlib-lgpl flag and see that the netlib classes are in the assembly jar. jar tvf spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native 6625 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefARPACK.class 21123 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefBLAS.class 178334 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefLAPACK.class 6640 Tue Jul 07 15:22:10 EDT 2015
Re: Using Dataframe write with newHdoopApi
Guys Any help would be great!! I am trying to use DF and save it to Elasticsearch using newHadoopApi (because I am using python). Can anyone guide me to help if this is even possible? I have managed to use df.rdd to complete es integration but I preferred df.write. is it possible or upcoming? On Mon, Jul 20, 2015 at 6:28 PM, ayan guha guha.a...@gmail.com wrote: Update: I have managed to use df.rdd to complete es integration but I preferred df.write. is it possible or upcoming? On 18 Jul 2015 23:19, ayan guha guha.a...@gmail.com wrote: Hi I am trying to use DF and save it to Elasticsearch using newHadoopApi (because I am using python). Can anyone guide me to help if this is even possible? -- Best Regards, Ayan Guha -- Best Regards, Ayan Guha
Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing
Hi Cody, Thanks for your answer. I'm with Spark 1.3.0. I don't quite understand how to use the messageHandler parameter/function in the createDirectStream method. You are referring to this, aren't you ? def createDirectStream[ K: ClassTag, V: ClassTag, KD : Decoder[K]: ClassTag , VD : Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] = R ): InputDStream[R] = { new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets, messageHandler) } So, I must supply the fromOffsets parameter too, but how do I tell this method to read from the beginning of my topic ? If I have a filter (e.g. a R.date field) on my R class, I can put a filter in the messageHandler function too ? Regards, Nicolas P. On Mon, Jul 20, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Yeah, in the function you supply for the messageHandler parameter to createDirectStream, catch the exception and do whatever makes sense for your application. On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hello, Using the old Spark Streaming Kafka API, I got the following around the same offset: kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3561357254, computed crc = 171652633) at kafka.message.Message.ensureValid(Message.scala:166) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message java.lang.IllegalStateException: Iterator is in failed state at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I found some old topic about some possible corrupt Kafka message produced by the new producer API with Snappy compression on. My question is, is it possible to skip/ignore those offsets when full processing with KafkaUtils.createStream or KafkaUtils.createDirectStream ? Regards, Nicolas PHUNG On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger c...@koeninger.org wrote: I'd try logging the offsets for each message, see where problems start, then try using the console consumer starting at those offsets and see if you can reproduce the problem. On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hi Cody, Thanks for you help. It seems there's something wrong with some messages within my Kafka topics then. I don't understand how, I can get bigger or incomplete message since I use default configuration to accept only 1Mb message in my Kafka topic. If you have any others informations or suggestions, please tell me. Regards, Nicolas PHUNG On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Not exactly the same issue, but possibly related: https://issues.apache.org/jira/browse/KAFKA-1196 On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org wrote: Well, working backwards down the stack trace... at java.nio.Buffer.limit(Buffer.java:275) That exception gets thrown if the limit is negative or greater than the buffer's capacity at kafka.message.Message.sliceDelimited(Message.scala:236) If size had been negative, it would have just returned null, so we know the exception got thrown because the size was greater than the buffer's capacity I haven't seen that before... maybe a corrupted message of some kind? If that problem is reproducible, try providing an explicit argument for messageHandler, with a function that logs the message offset. On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung
Re: standalone to connect mysql
I maybe find the answer from the sqlparser.scala file. Looks like the syntax spark used for insert is different from what we normally used for MySQL. I hope if someone can confirm this. Also I will appreciate if there is a SQL reference list available. Sent from my iPhone On 21 Jul 2015, at 9:21 pm, Jack Yang j...@uow.edu.aumailto:j...@uow.edu.au wrote: No. I did not use hiveContext at this stage. I am talking the embedded SQL syntax for pure spark sql. Thanks, mate. On 21 Jul 2015, at 6:13 pm, Terry Hole hujie.ea...@gmail.commailto:hujie.ea...@gmail.com wrote: Jack, You can refer the hive sql syntax if you use HiveContext: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML Thanks! -Terry That works! Thanks. Can I ask you one further question? How did spark sql support insertion? That is say, if I did: sqlContext.sql(insert into newStu values (10,a,1) the error is: failure: ``table'' expected but identifier newStu found insert into newStu values ('10', aa, 1) but if I did: sqlContext.sql(sinsert into Table newStu select * from otherStu) that works. Is there any document addressing that? Best regards, Jack From: Terry Hole [mailto:hujie.ea...@gmail.commailto:hujie.ea...@gmail.com] Sent: Tuesday, 21 July 2015 4:17 PM To: Jack Yang; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: standalone to connect mysql Maybe you can try: spark-submit --class sparkwithscala.SqlApp --jars /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar Thanks! -Terry Hi there, I would like to use spark to access the data in mysql. So firstly I tried to run the program using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master local[4] /home/myjar.jar that returns me the correct results. Then I tried the standalone version using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar (the mysql-connector-java-5.1.34.jar i have them on all worker nodes.) and the error is: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.157.129): java.sql.SQLException: No suitable driver found for jdbc:mysql://hadoop1:3306/sparkMysqlDB?user=rootpassword=root I also found the similar problem before in https://jira.talendforge.org/browse/TBD-2244. Is this a bug to be fixed later? Or do I miss anything? Best regards, Jack
Re: standalone to connect mysql
No. I did not use hiveContext at this stage. I am talking the embedded SQL syntax for pure spark sql. Thanks, mate. On 21 Jul 2015, at 6:13 pm, Terry Hole hujie.ea...@gmail.commailto:hujie.ea...@gmail.com wrote: Jack, You can refer the hive sql syntax if you use HiveContext: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML Thanks! -Terry That works! Thanks. Can I ask you one further question? How did spark sql support insertion? That is say, if I did: sqlContext.sql(insert into newStu values (10,a,1) the error is: failure: ``table'' expected but identifier newStu found insert into newStu values ('10', aa, 1) but if I did: sqlContext.sql(sinsert into Table newStu select * from otherStu) that works. Is there any document addressing that? Best regards, Jack From: Terry Hole [mailto:hujie.ea...@gmail.commailto:hujie.ea...@gmail.com] Sent: Tuesday, 21 July 2015 4:17 PM To: Jack Yang; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: standalone to connect mysql Maybe you can try: spark-submit --class sparkwithscala.SqlApp --jars /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar Thanks! -Terry Hi there, I would like to use spark to access the data in mysql. So firstly I tried to run the program using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master local[4] /home/myjar.jar that returns me the correct results. Then I tried the standalone version using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar (the mysql-connector-java-5.1.34.jar i have them on all worker nodes.) and the error is: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.157.129): java.sql.SQLException: No suitable driver found for jdbc:mysql://hadoop1:3306/sparkMysqlDB?user=rootpassword=root I also found the similar problem before in https://jira.talendforge.org/browse/TBD-2244. Is this a bug to be fixed later? Or do I miss anything? Best regards, Jack
Re: k-means iteration not terminate
thanks for this information, but i use cloudera live 5.4.4, and that have only spark 1.3. a newer version is not avaible. i don't understand this problem, first it compute some iterations and than it stop better do nothing. i think the problem is not find in the program code. maybe you know a other way to fix the problem? 2015-07-21 10:09 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: It could be a GC pause or something, you need to check in the stages tab and see what is taking time, If you upgrade to Spark 1.4, it has better UI and DAG visualization which helps you debug better. Thanks Best Regards On Mon, Jul 20, 2015 at 8:21 PM, Pa Rö paul.roewer1...@googlemail.com wrote: hi community, i have write a spark k-means app. now i run it on a cluster. my job start and at iteration nine or ten the process stop. in the spark dashbord all time shown is running, but nothing happend, no exceptions. my setting is the following: 1000 input points k=10 maxIteration=30 a tree node cluster (one node have 16GB RAM und 8cores i7) i use cloudera live 5.4.4 with spark 1.3 maybe spark need more memory or i have a wrong setting? best regards, paul
Re: What is the correct syntax of using Spark streamingContext.fileStream()?
Here's two ways of doing that: Without the filter function : JavaPairDStreamString, String foo = ssc.String, String, SequenceFileInputFormatfileStream(/tmp/foo); With the filter function: JavaPairInputDStreamLongWritable, Text foo = ssc.fileStream(/tmp/foo, LongWritable.class, Text.class, TextInputFormat.class, new FunctionPath, Boolean() { @Override public Boolean call(Path v1) throws Exception { return Boolean.TRUE; } }, true); Thanks Best Regards On Mon, Jul 20, 2015 at 11:10 PM, unk1102 umesh.ka...@gmail.com wrote: Hi I am trying to find correct way to use Spark Streaming API streamingContext.fileStream(String,ClassK,ClassV,ClassF) I tried to find example but could not find it anywhere in either Spark documentation. I have to stream files in hdfs which is of custom hadoop format. JavaPairDStreamVoid,MyRecordWritable input = streamingContext. fileStream(/path/to/hdfs/stream/dir/, Void.class, MyRecordWritable.class, MyInputFormat.class, ??); How do I implement fourth argument class type Function mentioned as ?? Please guide I am new to Spark Streaming. Thank in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-correct-syntax-of-using-Spark-streamingContext-fileStream-tp23916.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
DataFrame writer removes fields which is null for all rows
Consider the following code: val df = Seq((1, 3), (2, 3)).toDF(key, value).registerTempTable(tbl) sqlContext.sql(select key, null as value from tbl) .write.format(json).mode(SaveMode.Overwrite).save(test) sqlContext.read.format(json).load(test).printSchema() It shows: root |-- key: long (nullable = true) The field `value` is removed from the schema when saving the DF to json file, since it is null for all rows. Saving to parquet file is the same. Null fields missed ! It seems that it's a default behavior for DF. But I would like to keep the null fields for schema consistency. Are there some options/configs to do for this purpose ? Thx. -- Hao Ren Data Engineer @ leboncoin Paris, France
Re: use S3-Compatible Storage with spark
Did you try with s3a? It seems its more like an issue with hadoop. Thanks Best Regards On Tue, Jul 21, 2015 at 2:31 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: It seems to work for the credentials , but the endpoint is ignored.. : I've changed it to sc.hadoopConfiguration.set(fs.s3n.endpoint,test.com ) And I continue to get my data from amazon, how could it be ? (I also use s3n in my text url) 2015-07-21 9:30 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: You can add the jar in the classpath, and you can set the property like: sc.hadoopConfiguration.set(fs.s3a.endpoint,storage.sigmoid.com) Thanks Best Regards On Mon, Jul 20, 2015 at 9:41 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Thanks, that is what I was looking for... Any Idea where I have to store and reference the corresponding hadoop-aws-2.6.0.jar ?: java.io.IOException: No FileSystem for scheme: s3n 2015-07-20 8:33 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Not in the uri, but in the hadoop configuration you can specify it. property namefs.s3a.endpoint/name descriptionAWS S3 endpoint to connect to. An up-to-date list is provided in the AWS Documentation: regions and endpoints. Without this property, the standard region (s3.amazonaws.com) is assumed. /description /property Thanks Best Regards On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: I want to use pithos, were do I can specify that endpoint, is it possible in the url ? 2015-07-19 17:22 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Could you name the Storage service that you are using? Most of them provides a S3 like RestAPI endpoint for you to hit. Thanks Best Regards On Fri, Jul 17, 2015 at 2:06 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to amazon, is there a way I can specify the host somewhere ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Classifier for Big Data Mining
Which classifier can be useful for mining massive datasets in spark? Decision Tree can be good choice as per scalability? -- CHINTAN BHATT http://in.linkedin.com/pub/chintan-bhatt/22/b31/336/ Assistant Professor, U P U Patel Department of Computer Engineering, Chandubhai S. Patel Institute of Technology, Charotar University of Science And Technology (CHARUSAT), Changa-388421, Gujarat, INDIA. http://www.charusat.ac.in *Personal Website*: https://sites.google.com/a/ecchanga.ac.in/chintan/
Re: Broadcast variables in R
I might add to this that I've done the same exercise on Linux (CentOS 6) and there, broadcast variables ARE working. Is this functionality perhaps not exposed on Mac OS X? Or has it to do with the fact there are no native Hadoop libs for Mac? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-in-R-tp23914p23927.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark : spark.eventLog.dir on Windows Environment
Here are some resources which will help you with that. - http://stackoverflow.com/questions/19620642/failed-to-locate-the-winutils-binary-in-the-hadoop-binary-path - https://issues.apache.org/jira/browse/SPARK-2356 Thanks Best Regards On Tue, Jul 21, 2015 at 1:57 PM, Nitin Kalra nitinkalra2...@gmail.com wrote: Hi Akhil, I don't have HADOOP_HOME or HADOOP_CONF_DIR and even winutils.exe ? What's the configuration required for this ? From where can I get winutils.exe ? Thanks and Regards, Nitin Kalra On Tue, Jul 21, 2015 at 1:30 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Do you have HADOOP_HOME, HADOOP_CONF_DIR and hadoop's winutils.exe in the environment? Thanks Best Regards On Mon, Jul 20, 2015 at 5:45 PM, nitinkalra2000 nitinkalra2...@gmail.com wrote: Hi All, I am working on Spark 1.4 on windows environment. I have to set eventLog directory so that I can reopen the Spark UI after application has finished. But I am not able to set eventLog.dir, It gives an error on Windows environment. Configuation is : entry key=spark.eventLog.enabled value=true / entry key=spark.eventLog.dir value=file:///c:/sparklogs / Exception I get : java.io.IOException: Cannot run program cygpath: CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.hadoop.util.Shell.runCommand(Shell.java:206) I have also tried installing Cygwin but still the error doesn't go. Can anybody give any advice on it? I have posted the same question on Stackoverflow as well : http://stackoverflow.com/questions/31468716/apache-spark-spark-eventlog-dir-on-windows-environment Thanks Nitin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-spark-eventLog-dir-on-Windows-Environment-tp23913.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL/DDF's for production
Hi I want to ask an issue I have faced while using Spark. I load dataframes from parquet files. Some dataframes' parquet have lots of partitions, 10 million rows. Running where id = x query on dataframe scans all partitions. When saving to rdd object/parquet there is a partition column. The mentioned where query on the partition column should zero in and only open possible partitions. Sometimes I need to create index on other columns too to speed things up. Without index I feel its not production ready. I see there are two parts to do this: Ability of spark SQL to create/use indexes - Mentioned as to be implemented in documentation Parquet index support- arriving in v2.0 currently it is v1.8 When can we hope to get index support that Spark SQL/catalyst can use. Is anyone using Spark SQL in production. How did you handle this ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DDF-s-for-production-tp23926.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: use S3-Compatible Storage with spark
It seems to work for the credentials , but the endpoint is ignored.. : I've changed it to sc.hadoopConfiguration.set(fs.s3n.endpoint,test.com) And I continue to get my data from amazon, how could it be ? (I also use s3n in my text url) 2015-07-21 9:30 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: You can add the jar in the classpath, and you can set the property like: sc.hadoopConfiguration.set(fs.s3a.endpoint,storage.sigmoid.com) Thanks Best Regards On Mon, Jul 20, 2015 at 9:41 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Thanks, that is what I was looking for... Any Idea where I have to store and reference the corresponding hadoop-aws-2.6.0.jar ?: java.io.IOException: No FileSystem for scheme: s3n 2015-07-20 8:33 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Not in the uri, but in the hadoop configuration you can specify it. property namefs.s3a.endpoint/name descriptionAWS S3 endpoint to connect to. An up-to-date list is provided in the AWS Documentation: regions and endpoints. Without this property, the standard region (s3.amazonaws.com) is assumed. /description /property Thanks Best Regards On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: I want to use pithos, were do I can specify that endpoint, is it possible in the url ? 2015-07-19 17:22 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Could you name the Storage service that you are using? Most of them provides a S3 like RestAPI endpoint for you to hit. Thanks Best Regards On Fri, Jul 17, 2015 at 2:06 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to amazon, is there a way I can specify the host somewhere ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is there more information about spark shuffer-service
There is a saying If the service is enabled, Spark executors will fetch shuffle files from the service instead of from each other. in the wiki https://spark.apache.org/docs/1.3.0/job-scheduling.html#graceful-decommission-of-executors https://spark.apache.org/docs/1.3.0/job-scheduling.html#graceful-decommission-of-executors Is there more information about shuffer-service. For example. How to deal with the service shut down, does any redundancy exists? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-more-information-about-spark-shuffer-service-tp23925.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark : spark.eventLog.dir on Windows Environment
Please see the tail of: https://issues.apache.org/jira/browse/SPARK-2356 On Jul 21, 2015, at 1:27 AM, Nitin Kalra nitinkalra2...@gmail.com wrote: Hi Akhil, I don't have HADOOP_HOME or HADOOP_CONF_DIR and even winutils.exe ? What's the configuration required for this ? From where can I get winutils.exe ? Thanks and Regards, Nitin Kalra On Tue, Jul 21, 2015 at 1:30 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Do you have HADOOP_HOME, HADOOP_CONF_DIR and hadoop's winutils.exe in the environment? Thanks Best Regards On Mon, Jul 20, 2015 at 5:45 PM, nitinkalra2000 nitinkalra2...@gmail.com wrote: Hi All, I am working on Spark 1.4 on windows environment. I have to set eventLog directory so that I can reopen the Spark UI after application has finished. But I am not able to set eventLog.dir, It gives an error on Windows environment. Configuation is : entry key=spark.eventLog.enabled value=true / entry key=spark.eventLog.dir value=file:///c:/sparklogs / Exception I get : java.io.IOException: Cannot run program cygpath: CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.hadoop.util.Shell.runCommand(Shell.java:206) I have also tried installing Cygwin but still the error doesn't go. Can anybody give any advice on it? I have posted the same question on Stackoverflow as well : http://stackoverflow.com/questions/31468716/apache-spark-spark-eventlog-dir-on-windows-environment Thanks Nitin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-spark-eventLog-dir-on-Windows-Environment-tp23913.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: standalone to connect mysql
Maybe you can try: spark-submit --class sparkwithscala.SqlApp --jars /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar Thanks! -Terry Hi there, I would like to use spark to access the data in mysql. So firstly I tried to run the program using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master local[4] /home/myjar.jar that returns me the correct results. Then I tried the standalone version using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar (the mysql-connector-java-5.1.34.jar i have them on all worker nodes.) and the error is: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.157.129): java.sql.SQLException: No suitable driver found for jdbc:mysql://hadoop1:3306/sparkMysqlDB?user=rootpassword=root I also found the similar problem before in https://jira.talendforge.org/browse/TBD-2244. Is this a bug to be fixed later? Or do I miss anything? Best regards, Jack
Spark Application stuck retrying task failed on Java heap space?
Hello, *TL;DR: task crashes with OOM, but application gets stuck in infinite loop retrying the task over and over again instead of failing fast.* Using Spark 1.4.0, standalone, with DataFrames on Java 7. I have an application that does some aggregations. I played around with shuffling settings, which led to the dreaded Java heap space error. See the stack trace at the end of this message. When this happens, I see 10's of executors in EXITED state, a couple in LOADING and one in RUNNING. All of them are retrying the same task all over again, and keep failing with the same Java heap space error. This goes on for hours! Why doesn't the whole application fail, when the individual executors keep failing with the same error? Thanks, Romi K. --- end of the log in a failed task: 15/07/21 11:13:40 INFO executor.Executor: Finished task 117.0 in stage 218.1 (TID 305). 2000 bytes result sent to driver 15/07/21 11:13:41 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 306 15/07/21 11:13:41 INFO executor.Executor: Running task 0.0 in stage 219.1 (TID 306) 15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Updating epoch to 420 and clearing cache 15/07/21 11:13:41 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 8 15/07/21 11:13:41 INFO storage.MemoryStore: ensureFreeSpace(5463) called with curMem=285917, maxMem=1406164008 15/07/21 11:13:41 INFO storage.MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 5.3 KB, free 1340.7 MB) 15/07/21 11:13:41 INFO broadcast.TorrentBroadcast: Reading broadcast variable 8 took 22 ms 15/07/21 11:13:41 INFO storage.MemoryStore: ensureFreeSpace(10880) called with curMem=291380, maxMem=1406164008 15/07/21 11:13:41 INFO storage.MemoryStore: Block broadcast_8 stored as values in memory (estimated size 10.6 KB, free 1340.7 MB) 15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 136, fetching them 15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker endpoint = AkkaRpcEndpointRef(Actor[akka.tcp:// sparkDriver@1.2.3.4:57490/user/MapOutputTracker#-99712578]) 15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Got the output locations 15/07/21 11:13:41 INFO storage.ShuffleBlockFetcherIterator: Getting 182 non-empty blocks out of 182 blocks 15/07/21 11:13:41 INFO storage.ShuffleBlockFetcherIterator: Started 4 remote fetches in 28 ms 15/07/21 11:14:34 ERROR executor.Executor: Exception in task 0.0 in stage 219.1 (TID 306) java.lang.OutOfMemoryError: Java heap space at scala.collection.mutable.ResizableArray$class.ensureSize(ResizableArray.scala:99) at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:47) at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:83) at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:47) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.Sort$$anonfun$doExecute$5$$anonfun$apply$5.apply(basicOperators.scala:192) at org.apache.spark.sql.execution.Sort$$anonfun$doExecute$5$$anonfun$apply$5.apply(basicOperators.scala:190) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at
1.4.0 classpath issue with spark-submit
I have a spark program that uses dataframes to query hive and I run it both as a spark-shell for exploration and I have a runner class that executes some tasks with spark-submit. I used to run against 1.4.0-SNAPSHOT. Since then 1.4.0 and 1.4.1 were released so I tried to switch to the official release. Now, when I run the program as a shell, everything works but when I try to run it with spark-submit it complains with this error: Exception in thread main java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/ql/session/SessionState when creating Hive client using classpath: file:/home/mharis/dxp-spark.jar Please make sure that jars for your version of hive and hadoop are included in the paths passed to spark.sql.hive.metastore.jars. What is suspicious is firstly 'using classpath: ...' where the jar is my program, i.e. the paths that are passed along with --driver-class-path option are missing. When I switch to an older 1.4.0-SNAPSHOT on the driver, everything works. I observe the issue with 1.4.1. Are there any known obvious changes to how spark-submit handles configuration that I have missed ? -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
log4j.xml bundled in jar vs log4.properties in spark/conf
Hi, I have log4j.xml in my jar From 1.4.1 it seems that log4j.properties in spark/conf is defined first in classpath so the spark.conf/log4j.properties wins before that (in v1.3.0) log4j.xml bundled in jar defined the configuration if I manually add my jar to be strictly first in classpath(by adding it to SPARK_CLASSPATH in spark-env.sh) log4j.xml in jar wins do somebody knows what changed? any ideas? ps: tried to use spark.driver.userClassPathFirst=true spark.executor.userClassPathFirst=true, however I'm getting strange errors -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/log4j-xml-bundled-in-jar-vs-log4-properties-in-spark-conf-tp23923.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: k-means iteration not terminate
It could be a GC pause or something, you need to check in the stages tab and see what is taking time, If you upgrade to Spark 1.4, it has better UI and DAG visualization which helps you debug better. Thanks Best Regards On Mon, Jul 20, 2015 at 8:21 PM, Pa Rö paul.roewer1...@googlemail.com wrote: hi community, i have write a spark k-means app. now i run it on a cluster. my job start and at iteration nine or ten the process stop. in the spark dashbord all time shown is running, but nothing happend, no exceptions. my setting is the following: 1000 input points k=10 maxIteration=30 a tree node cluster (one node have 16GB RAM und 8cores i7) i use cloudera live 5.4.4 with spark 1.3 maybe spark need more memory or i have a wrong setting? best regards, paul
Re: What else is need to setup native support of BLAS/LAPACK with Spark?
Yes, I imagine it's the driver's classpath - I'm pulling those screenshots straight from the Spark UI environment page. Is there somewhere else to grab the executor class path? Also, the warning is only printing once, so it's also not clear whether the warning is from the driver or exectuor, would you know? Thanks, Arun On Tue, Jul 21, 2015 at 7:52 AM, Sean Owen so...@cloudera.com wrote: Great, and that file exists on HDFS and is world readable? just double-checking. What classpath is this -- your driver or executor? this is the driver, no? I assume so just because it looks like it references the assembly you built locally and from which you're launching the driver. I think we're concerned with the executors and what they have on the classpath. I suspect there is still a problem somewhere in there. On Mon, Jul 20, 2015 at 4:59 PM, Arun Ahuja aahuj...@gmail.com wrote: Cool, I tried that as well, and doesn't seem different: spark.yarn.jar seems set [image: Inline image 1] This actually doesn't change the classpath, not sure if it should: [image: Inline image 3] But same netlib warning. Thanks for the help! - Arun On Fri, Jul 17, 2015 at 3:18 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Can you try setting the spark.yarn.jar property to make sure it points to the jar you're thinking of? -Sandy On Fri, Jul 17, 2015 at 11:32 AM, Arun Ahuja aahuj...@gmail.com wrote: Yes, it's a YARN cluster and using spark-submit to run. I have SPARK_HOME set to the directory above and using the spark-submit script from there. bin/spark-submit --master yarn-client --executor-memory 10g --driver-memory 8g --num-executors 400 --executor-cores 1 --class org.hammerlab.guacamole.Guacamole --conf spark.default.parallelism=4000 --conf spark.storage.memoryFraction=0.15 libgfortran.so.3 is also there ls /usr/lib64/libgfortran.so.3 /usr/lib64/libgfortran.so.3 These are jniloader files in the jar jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep jniloader META-INF/maven/com.github.fommil/jniloader/ META-INF/maven/com.github.fommil/jniloader/pom.xml META-INF/maven/com.github.fommil/jniloader/pom.properties Thanks, Arun On Fri, Jul 17, 2015 at 1:30 PM, Sean Owen so...@cloudera.com wrote: Make sure /usr/lib64 contains libgfortran.so.3; that's really the issue. I'm pretty sure the answer is 'yes', but, make sure the assembly has jniloader too. I don't see why it wouldn't, but, that's needed. What is your env like -- local, standalone, YARN? how are you running? Just want to make sure you are using this assembly across your cluster. On Fri, Jul 17, 2015 at 6:26 PM, Arun Ahuja aahuj...@gmail.com wrote: Hi Sean, Thanks for the reply! I did double-check that the jar is one I think I am running: [image: Inline image 2] jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native com/github/fommil/netlib/NativeRefARPACK.class com/github/fommil/netlib/NativeRefBLAS.class com/github/fommil/netlib/NativeRefLAPACK.class com/github/fommil/netlib/NativeSystemARPACK.class com/github/fommil/netlib/NativeSystemBLAS.class com/github/fommil/netlib/NativeSystemLAPACK.class Also, I checked the gfortran version on the cluster nodes and it is available and is 5.1 $ gfortran --version GNU Fortran (GCC) 5.1.0 Copyright (C) 2015 Free Software Foundation, Inc. and still see: 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Does anything need to be adjusted in my application POM? Thanks, Arun On Thu, Jul 16, 2015 at 5:26 PM, Sean Owen so...@cloudera.com wrote: Yes, that's most of the work, just getting the native libs into the assembly. netlib can find them from there even if you don't have BLAS libs on your OS, since it includes a reference implementation as a fallback. One common reason it won't load is not having libgfortran installed on your OSes though. It has to be 4.6+ too. That can't be shipped even in netlib and has to exist on your hosts. The other thing I'd double-check is whether you are really using this assembly you built for your job -- like, it's the actually the assembly the executors are using. On Tue, Jul 7, 2015 at 8:47 PM, Arun Ahuja aahuj...@gmail.com wrote: Is there more documentation on what is needed to setup BLAS/LAPACK native suport with Spark. I’ve built spark with the -Pnetlib-lgpl flag and see that the netlib classes are in the assembly jar.