Re: How to parse multiple event types using Kafka
Each spark partition will contain messages only from a single kafka topcipartition. Use hasOffsetRanges to tell which kafka topicpartition it's from. See the docs http://spark.apache.org/docs/latest/streaming-kafka-integration.html On Sun, Aug 23, 2015 at 10:56 AM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: Folks, I use the following Streaming API from KafkaUtils : public JavaPairInputDStreamString, String inputDStream() { HashSetString topicsSet = new HashSetString(Arrays.asList(topics.split(,))); HashMapString, String kafkaParams = new HashMapString, String(); kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), brokers); return KafkaUtils.createDirectStream( streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); } I catch the messages using : JavaDStreamString messages = inputDStream.map(new FunctionTuple2String, String, String() { @Override public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); My problem is, each of these Kafka Topics stream in different message types. How do I distinguish messages that are of type1, messages that are of type2, . ? I tried the following: private class ParseEventsT implements FunctionString, T { final ClassT parameterClass; private ParseEvents(ClassT parameterClass) { this.parameterClass = parameterClass; } public T call(String message) throws Exception { ObjectMapper mapper = new ObjectMapper(); T parsedMessage = null; try { parsedMessage = mapper.readValue(message, this.parameterClass); } catch (Exception e1) { logger.error(Ignoring Unknown Message %s, message); } return parsedMessage; } } JavaDStreamType1 type1Events = messages.map(new ParseEventsType1(Type1.class)); JavaDStreamType2 type2Events = messages.map(new ParseEventsType2(Type2.class)); JavaDStreamType3 type3Events = messages.map(new ParseEventsType3(Type3.class)); But this does not work because type1 catches type2 messages and ignores them. Is there a clean way of handling this ?
Re: Spark streaming multi-tasking during I/O
If you set concurrentJobs flag to 2, then it lets you run two jobs parallel. It will be a bit hard for you predict the application behavior with this flag thus debugging would be a headache. Thanks Best Regards On Sun, Aug 23, 2015 at 10:36 AM, Sateesh Kavuri sateesh.kav...@gmail.com wrote: Hi Akhil, Think of the scenario as running a piece of code in normal Java with multiple threads. Lets say there are 4 threads spawned by a Java process to handle reading from database, some processing and storing to database. In this process, while a thread is performing a database I/O, the CPU could allow another thread to perform the processing, thus efficiently using the resources. Incase of Spark, while a node executor is running the same read from DB = process data = store to DB, during the read from DB and store to DB phase, the CPU is not given to other requests in queue, since the executor will allocate the resources completely to the current ongoing request. Does not flag spark.streaming.concurrentJobs enable this kind of scenario or is there any other way to achieve what I am looking for Thanks, Sateesh On Sat, Aug 22, 2015 at 7:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hmm for a singl core VM you will have to run it in local mode(specifying master= local[4]). The flag is available in all the versions of spark i guess. On Aug 22, 2015 5:04 AM, Sateesh Kavuri sateesh.kav...@gmail.com wrote: Thanks Akhil. Does this mean that the executor running in the VM can spawn two concurrent jobs on the same core? If this is the case, this is what we are looking for. Also, which version of Spark is this flag in? Thanks, Sateesh On Sat, Aug 22, 2015 at 1:44 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can look at the spark.streaming.concurrentJobs by default it runs a single job. If set it to 2 then it can run 2 jobs parallely. Its an experimental flag, but go ahead and give it a try. On Aug 21, 2015 3:36 AM, Sateesh Kavuri sateesh.kav...@gmail.com wrote: Hi, My scenario goes like this: I have an algorithm running in Spark streaming mode on a 4 core virtual machine. Majority of the time, the algorithm does disk I/O and database I/O. Question is, during the I/O, where the CPU is not considerably loaded, is it possible to run any other task/thread so as to efficiently utilize the CPU? Note that one DStream of the algorithm runs completely on a single CPU Thank you, Sateesh
Re: Error when saving a dataframe as ORC file
Hi Ted, Thanks for the reply. I tried setting both of the keyid and accesskey via sc.hadoopConfiguration.set(fs.s3n.awsAccessKeyId, ***) sc.hadoopConfiguration.set(fs.s3n.awsSecretAccessKey, **) However, the error still occurs for ORC format. If I change the format to JSON, although the error does not go, the JSON files can be saved successfully. On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu yuzhih...@gmail.com wrote: You may have seen this: http://search-hadoop.com/m/q3RTtdSyM52urAyI On Aug 23, 2015, at 1:01 AM, lostrain A donotlikeworkingh...@gmail.com wrote: Hi, I'm trying to save a simple dataframe to S3 in ORC format. The code is as follows: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._ val df=sc.parallelize(1 to 1000).toDF() df.write.format(orc).save(s3://logs/dummy) I ran the above code in spark-shell and only the _SUCCESS file was saved under the directory. The last part of the spark-shell log said: 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100) 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage 2 (save at console:29) finished in 0.834 s 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at console:29, took 0.895912 s 15/08/23 07:38:24 main INFO LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory: /media/ephemeral0/s3/output- 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78, 4, -23, -103, 9, -104, -20, -8, 66, 126] 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__ committed. Anyone has experienced this before? Thanks!
Re: Error when saving a dataframe as ORC file
Ted, Thanks for the suggestions. Actually I tried both s3n and s3 and the result remains the same. On Sun, Aug 23, 2015 at 12:27 PM, Ted Yu yuzhih...@gmail.com wrote: In your case, I would specify fs.s3.awsAccessKeyId / fs.s3.awsSecretAccessKey since you use s3 protocol. On Sun, Aug 23, 2015 at 11:03 AM, lostrain A donotlikeworkingh...@gmail.com wrote: Hi Ted, Thanks for the reply. I tried setting both of the keyid and accesskey via sc.hadoopConfiguration.set(fs.s3n.awsAccessKeyId, ***) sc.hadoopConfiguration.set(fs.s3n.awsSecretAccessKey, **) However, the error still occurs for ORC format. If I change the format to JSON, although the error does not go, the JSON files can be saved successfully. On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu yuzhih...@gmail.com wrote: You may have seen this: http://search-hadoop.com/m/q3RTtdSyM52urAyI On Aug 23, 2015, at 1:01 AM, lostrain A donotlikeworkingh...@gmail.com wrote: Hi, I'm trying to save a simple dataframe to S3 in ORC format. The code is as follows: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._ val df=sc.parallelize(1 to 1000).toDF() df.write.format(orc).save(s3://logs/dummy) I ran the above code in spark-shell and only the _SUCCESS file was saved under the directory. The last part of the spark-shell log said: 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100) 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage 2 (save at console:29) finished in 0.834 s 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at console:29, took 0.895912 s 15/08/23 07:38:24 main INFO LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory: /media/ephemeral0/s3/output- 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78, 4, -23, -103, 9, -104, -20, -8, 66, 126] 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__ committed. Anyone has experienced this before? Thanks!
B2i Healthcare Powered by Spark addition
Another addition to the Powered by Spark page: B2i Healthcare (http://b2i.sg) uses Spark in healthcare analytics with medical ontologies like SNOMED CT. Our Snow Owl MQ ( http://b2i.sg/snow-owl-mq) product relies on the Spark ecosystem to analyze ~1 billion health records with over 70 healthcare terminologies. An online demo is available at https://mq.b2i.sg. Use cases include: - Creating cohorts to group patients with similar demographic traits, drug exposures, clinical findings, procedures, and observations (Spark Core, Spark SQL, GraphX) - Inspecting patient records to identify trends and correlations (SparkR) - Statistical analysis of patient cohorts to test and verify clinical hypotheses (MLlib) - Identification of potential adverse drug events and interactions with pharmacovigilance signal detection (Streaming) Thanks, Brandon
Re: How to set environment of worker applications
spark-env.sh works for me in Spark 1.4 but not spark.executor.extraJavaOptions. On Sun, Aug 23, 2015 at 11:27 AM Raghavendra Pandey raghavendra.pan...@gmail.com wrote: I think the only way to pass on environment variables to worker node is to write it in spark-env.sh file on each worker node. On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com wrote: Check for spark.driver.extraJavaOptions and spark.executor.extraJavaOptions in the following article. I think you can use -D to pass system vars: spark.apache.org/docs/latest/configuration.html#runtime-environment Hi, I am starting a spark streaming job in standalone mode with spark-submit. Is there a way to make the UNIX environment variables with which spark-submit is started available to the processes started on the worker nodes? Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to parse multiple event types using Kafka
Folks, I use the following Streaming API from KafkaUtils : public JavaPairInputDStreamString, String inputDStream() { HashSetString topicsSet = new HashSetString(Arrays.asList(topics.split(,))); HashMapString, String kafkaParams = new HashMapString, String(); kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), brokers); return KafkaUtils.createDirectStream( streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); } I catch the messages using :JavaDStreamString messages = inputDStream.map(new FunctionTuple2String, String, String() { @Override public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); My problem is, each of these Kafka Topics stream in different message types. How do I distinguish messages that are of type1, messages that are of type2, . ? I tried the following: private class ParseEventsT implements FunctionString, T { final ClassT parameterClass; private ParseEvents(ClassT parameterClass) { this.parameterClass = parameterClass; } public T call(String message) throws Exception { ObjectMapper mapper = new ObjectMapper(); T parsedMessage = null; try { parsedMessage = mapper.readValue(message, this.parameterClass); } catch (Exception e1) { logger.error(Ignoring Unknown Message %s, message); } return parsedMessage; } }JavaDStreamType1 type1Events = messages.map(new ParseEventsType1(Type1.class));JavaDStreamType2 type2Events = messages.map(new ParseEventsType2(Type2.class));JavaDStreamType3 type3Events = messages.map(new ParseEventsType3(Type3.class)); But this does not work because type1 catches type2 messages and ignores them. Is there a clean way of handling this ?
Spark YARN executors are not launching when using +UseG1GC
Hi I am hitting issue of long GC pauses in my Spark job and because of it YARN is killing executors one by one and Spark job becomes slower and slower. I came across this article where they mentioned about using G1GC I tried to use the same command but something seems wrong https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html ./spark-submit --class com.xyz.MySpark --conf spark.executor.extraJavaOptions=-XX:MaxPermSize=512M -XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -Xms25g -Xmx25g -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThread=20 --driver-java-options -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client --executor-memory 25G --executor-cores 8 --num-executors 12 /home/myuser/myspark-1.0.jar First it said you cant use Xms/Xmx for executor so I removed it but executors never gets launched if I use above command please guide. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-YARN-executors-are-not-launching-when-using-UseG1GC-tp24407.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
is there a 'knack' to docker and mesos?
Really excited to try out the new Docker executor support on 1.4.1, I'm making progress but feel like I'm missing something. (versions: spark-1.4.1-hadoop2.6 - not using hadoop yet mac os x yosemite java 8 spark-shell mesos 0.22.1 : 2 slaves, 1 master + zk , all on centos 6.x docker 1.8.x ) I wanted to use a generic docker image with e.g. Java 8 in it, and then deploy the 1.4.1 distro into it. The docs seem to indicate that's supported. Mesos and Docker pull the requested docker images ( 'docker pull java' essentially) and extract the spark.executor.uri distro correctly. It seems to fall over at cd'ing of all places: cd: can't cd to spark-1* this causes the task to fail and eventually have spark blacklist the slave. Is this just because 'spark-1*' matches both the tarball and the directory? full mesos stderr looks like: I0823 19:13:25.608206 3069 fetcher.cpp:214] Fetching URI 'http://d3kbcqa49mib13.cloudfront.net/spark-1.4.1-bin-hadoop2.6.tgz' I0823 19:13:25.608582 3069 fetcher.cpp:125] Fetching URI 'http://d3kbcqa49mib13.cloudfront.net/spark-1.4.1-bin-hadoop2.6.tgz' with os::net I0823 19:13:25.608620 3069 fetcher.cpp:135] Downloading 'http://d3kbcqa49mib13.cloudfront.net/spark-1.4.1-bin-hadoop2.6.tgz' to '/var/mesos/slaves/20150823-110659-1862270986-5050-3230-S1/frameworks/20150823-191138-1862270986-5050-3768-/executors/0/runs/a5a13cd4-013a-4ebc-8ef7-eb9c33358429/spark-1.4.1-bin-hadoop2.6.tgz' I0823 19:14:37.765060 3069 fetcher.cpp:78] Extracted resource '/var/mesos/slaves/20150823-110659-1862270986-5050-3230-S1/frameworks/20150823-191138-1862270986-5050-3768-/executors/0/runs/a5a13cd4-013a-4ebc-8ef7-eb9c33358429/spark-1.4.1-bin-hadoop2.6.tgz' into '/var/mesos/slaves/20150823-110659-1862270986-5050-3230-S1/frameworks/20150823-191138-1862270986-5050-3768-/executors/0/runs/a5a13cd4-013a-4ebc-8ef7-eb9c33358429' /bin/sh: 1: cd: can't cd to spark-1* /bin/sh: 1: ./bin/spark-class: not found I0823 19:14:38.365190 3138 exec.cpp:132] Version: 0.22.1 I0823 19:14:38.369495 3156 exec.cpp:206] Executor registered on slave 20150823-110659-1862270986-5050-3230-S1 - and the spark-shell says: 15/08/23 20:15:51 INFO CoarseMesosSchedulerBackend: Mesos task 3 is now TASK_RUNNING 15/08/23 20:15:51 INFO CoarseMesosSchedulerBackend: Mesos task 3 is now TASK_FAILED 15/08/23 20:15:51 INFO CoarseMesosSchedulerBackend: Blacklisting Mesos slave value: 20150823-110659-1862270986-5050-3230-S0 due to too many failures; is Spark installed on it? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error when saving a dataframe as ORC file
In your case, I would specify fs.s3.awsAccessKeyId / fs.s3.awsSecretAccessKey since you use s3 protocol. On Sun, Aug 23, 2015 at 11:03 AM, lostrain A donotlikeworkingh...@gmail.com wrote: Hi Ted, Thanks for the reply. I tried setting both of the keyid and accesskey via sc.hadoopConfiguration.set(fs.s3n.awsAccessKeyId, ***) sc.hadoopConfiguration.set(fs.s3n.awsSecretAccessKey, **) However, the error still occurs for ORC format. If I change the format to JSON, although the error does not go, the JSON files can be saved successfully. On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu yuzhih...@gmail.com wrote: You may have seen this: http://search-hadoop.com/m/q3RTtdSyM52urAyI On Aug 23, 2015, at 1:01 AM, lostrain A donotlikeworkingh...@gmail.com wrote: Hi, I'm trying to save a simple dataframe to S3 in ORC format. The code is as follows: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._ val df=sc.parallelize(1 to 1000).toDF() df.write.format(orc).save(s3://logs/dummy) I ran the above code in spark-shell and only the _SUCCESS file was saved under the directory. The last part of the spark-shell log said: 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100) 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage 2 (save at console:29) finished in 0.834 s 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at console:29, took 0.895912 s 15/08/23 07:38:24 main INFO LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory: /media/ephemeral0/s3/output- 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78, 4, -23, -103, 9, -104, -20, -8, 66, 126] 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__ committed. Anyone has experienced this before? Thanks!
Re: Spark Mesos Dispatcher
I'm currently having the same issues. The documentation for Mesos dispatcher is sparse. I'll also add that I'm able to see the framework running in the mesos and spark driver UIs, but when viewing the spark job ui on a slave, no job is seen. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Mesos-Dispatcher-tp24238p24404.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to set environment of worker applications
I think the only way to pass on environment variables to worker node is to write it in spark-env.sh file on each worker node. On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com wrote: Check for spark.driver.extraJavaOptions and spark.executor.extraJavaOptions in the following article. I think you can use -D to pass system vars: spark.apache.org/docs/latest/configuration.html#runtime-environment Hi, I am starting a spark streaming job in standalone mode with spark-submit. Is there a way to make the UNIX environment variables with which spark-submit is started available to the processes started on the worker nodes? Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Error when saving a dataframe as ORC file
Hi, I'm trying to save a simple dataframe to S3 in ORC format. The code is as follows: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._ val df=sc.parallelize(1 to 1000).toDF() df.write.format(orc).save(s3://logs/dummy) I ran the above code in spark-shell and only the _SUCCESS file was saved under the directory. The last part of the spark-shell log said: 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100) 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage 2 (save at console:29) finished in 0.834 s 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at console:29, took 0.895912 s 15/08/23 07:38:24 main INFO LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory: /media/ephemeral0/s3/output- 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78, 4, -23, -103, 9, -104, -20, -8, 66, 126] 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__ committed. Anyone has experienced this before? Thanks!
Re: Error when saving a dataframe as ORC file
You may have seen this: http://search-hadoop.com/m/q3RTtdSyM52urAyI On Aug 23, 2015, at 1:01 AM, lostrain A donotlikeworkingh...@gmail.com wrote: Hi, I'm trying to save a simple dataframe to S3 in ORC format. The code is as follows: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._ val df=sc.parallelize(1 to 1000).toDF() df.write.format(orc).save(s3://logs/dummy) I ran the above code in spark-shell and only the _SUCCESS file was saved under the directory. The last part of the spark-shell log said: 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100) 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage 2 (save at console:29) finished in 0.834 s 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at console:29, took 0.895912 s 15/08/23 07:38:24 main INFO LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory: /media/ephemeral0/s3/output- 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78, 4, -23, -103, 9, -104, -20, -8, 66, 126] 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__ committed. Anyone has experienced this before? Thanks!
Re: How to set environment of worker applications
Check for spark.driver.extraJavaOptions and spark.executor.extraJavaOptions in the following article. I think you can use -D to pass system vars: spark.apache.org/docs/latest/configuration.html#runtime-environment Hi, I am starting a spark streaming job in standalone mode with spark-submit. Is there a way to make the UNIX environment variables with which spark-submit is started available to the processes started on the worker nodes? Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to set environment of worker applications
Hi, I am starting a spark streaming job in standalone mode with spark-submit. Is there a way to make the UNIX environment variables with which spark-submit is started available to the processes started on the worker nodes? Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Mesos Dispatcher
Hi Bcjaes, Sorry I didn't see the previous thread so not sure what issues you are running into. In cluster mode the driver logs and results are all available through the Mesos UI, you need to look at terminated frameworks if it's a job that's already finished. I'll try to add more docs as we are still completing some other features around cluster mode on Mesos. Tim On Aug 23, 2015, at 7:22 AM, bcajes brian.ca...@gmail.com wrote: I'm currently having the same issues. The documentation for Mesos dispatcher is sparse. I'll also add that I'm able to see the framework running in the mesos and spark driver UIs, but when viewing the spark job ui on a slave, no job is seen. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Mesos-Dispatcher-tp24238p24404.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Sql behaves strangely with tables with a lot of partitions
anybody has any suggestions? On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Is there a workaround without updating Hadoop? Would really appreciate if someone can explain what spark is trying to do here and what is an easy way to turn this off. Thanks all! On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: Did you try with hadoop version 2.7.1 .. It is known that s3a works really well with parquet which is available in 2.7. They fixed lot of issues related to metadata reading there... On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote: @Cheng, Hao : Physical plans show that it got stuck on scanning S3! (table is partitioned by date_prefix and hour) explain select count(*) from test_table where date_prefix='20150819' and hour='00'; TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)] TungstenExchange SinglePartition TungstenAggregate(key=[], value=[(count(1),mode=Partial,isDistinct=false)] Scan ParquetRelation[ .. about 1000 partition paths go here ] Why does spark have to scan all partitions when the query only concerns with 1 partitions? Doesn't it defeat the purpose of partitioning? Thanks! On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com wrote: I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and I couldn't find much information about it online. What does it mean exactly to disable it? Are there any negative consequences to disabling it? On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote: Can you make some more profiling? I am wondering if the driver is busy with scanning the HDFS / S3. Like jstack pid of driver process And also, it’s will be great if you can paste the physical plan for the simple query. *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 1:46 PM *To:* Cheng, Hao *Cc:* Philip Weaver; user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs trying to speed up spark sql with tables with a huge number of partitions, I've made sure that those CLs are included but it's still very slow On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote: Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false. BTW, which version are you using? Hao *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 12:16 PM *To:* Philip Weaver *Cc:* user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I guess the question is why does spark have to do partition discovery with all partitions when the query only needs to look at one partition? Is there a conf flag to turn this off? On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.com wrote: I've had the same problem. It turns out that Spark (specifically parquet) is very slow at partition discovery. It got better in 1.5 (not yet released), but was still unacceptably slow. Sadly, we ended up reading parquet files manually in Python (via C++) and had to abandon Spark SQL because of this problem. On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I did a simple experiment with Spark SQL. I created a partitioned parquet table with only one partition (date=20140701). A simple `select count(*) from table where date=20140701` would run very fast (0.1 seconds). However, as I added more partitions the query takes longer and longer. When I added about 10,000 partitions, the query took way too long. I feel like querying for a single partition should not be affected by having more partitions. Is this a known behaviour? What does spark try to do here? Thanks, Jerrick
Re: Error when saving a dataframe as ORC file
SPARK-8458 is in 1.4.1 release. You can upgrade to 1.4.1 or, wait for the upcoming 1.5.0 release. On Sun, Aug 23, 2015 at 2:05 PM, lostrain A donotlikeworkingh...@gmail.com wrote: Hi Zhan, Thanks for the point. Yes I'm using a cluster with spark-1.4.0 and it looks like this is most likely the reason. I'll verify this again once the we make the upgrade. Best, los On Sun, Aug 23, 2015 at 1:25 PM, Zhan Zhang zzh...@hortonworks.com wrote: If you are using spark-1.4.0, probably it is caused by SPARK-8458 https://issues.apache.org/jira/browse/SPARK-8458 Thanks. Zhan Zhang On Aug 23, 2015, at 12:49 PM, lostrain A donotlikeworkingh...@gmail.com wrote: Ted, Thanks for the suggestions. Actually I tried both s3n and s3 and the result remains the same. On Sun, Aug 23, 2015 at 12:27 PM, Ted Yu yuzhih...@gmail.com wrote: In your case, I would specify fs.s3.awsAccessKeyId / fs.s3.awsSecretAccessKey since you use s3 protocol. On Sun, Aug 23, 2015 at 11:03 AM, lostrain A donotlikeworkingh...@gmail.com wrote: Hi Ted, Thanks for the reply. I tried setting both of the keyid and accesskey via sc.hadoopConfiguration.set(fs.s3n.awsAccessKeyId, ***) sc.hadoopConfiguration.set(fs.s3n.awsSecretAccessKey, **) However, the error still occurs for ORC format. If I change the format to JSON, although the error does not go, the JSON files can be saved successfully. On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu yuzhih...@gmail.com wrote: You may have seen this: http://search-hadoop.com/m/q3RTtdSyM52urAyI On Aug 23, 2015, at 1:01 AM, lostrain A donotlikeworkingh...@gmail.com wrote: Hi, I'm trying to save a simple dataframe to S3 in ORC format. The code is as follows: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._ val df=sc.parallelize(1 to 1000).toDF() df.write.format(orc).save(s3://logs/dummy) I ran the above code in spark-shell and only the _SUCCESS file was saved under the directory. The last part of the spark-shell log said: 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100) 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage 2 (save at console:29) finished in 0.834 s 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at console:29, took 0.895912 s 15/08/23 07:38:24 main INFO LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory: /media/ephemeral0/s3/output- 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78, 4, -23, -103, 9, -104, -20, -8, 66, 126] 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__ committed. Anyone has experienced this before? Thanks!
Re: Error when saving a dataframe as ORC file
If you are using spark-1.4.0, probably it is caused by SPARK-8458https://issues.apache.org/jira/browse/SPARK-8458 Thanks. Zhan Zhang On Aug 23, 2015, at 12:49 PM, lostrain A donotlikeworkingh...@gmail.commailto:donotlikeworkingh...@gmail.com wrote: Ted, Thanks for the suggestions. Actually I tried both s3n and s3 and the result remains the same. On Sun, Aug 23, 2015 at 12:27 PM, Ted Yu yuzhih...@gmail.commailto:yuzhih...@gmail.com wrote: In your case, I would specify fs.s3.awsAccessKeyId / fs.s3.awsSecretAccessKey since you use s3 protocol. On Sun, Aug 23, 2015 at 11:03 AM, lostrain A donotlikeworkingh...@gmail.commailto:donotlikeworkingh...@gmail.com wrote: Hi Ted, Thanks for the reply. I tried setting both of the keyid and accesskey via sc.hadoopConfiguration.set(fs.s3n.awsAccessKeyId, ***) sc.hadoopConfiguration.set(fs.s3n.awsSecretAccessKey, **) However, the error still occurs for ORC format. If I change the format to JSON, although the error does not go, the JSON files can be saved successfully. On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu yuzhih...@gmail.commailto:yuzhih...@gmail.com wrote: You may have seen this: http://search-hadoop.com/m/q3RTtdSyM52urAyI On Aug 23, 2015, at 1:01 AM, lostrain A donotlikeworkingh...@gmail.commailto:donotlikeworkingh...@gmail.com wrote: Hi, I'm trying to save a simple dataframe to S3 in ORC format. The code is as follows: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._ val df=sc.parallelize(1 to 1000).toDF() df.write.format(orc).save(s3://logs/dummy) I ran the above code in spark-shell and only the _SUCCESS file was saved under the directory. The last part of the spark-shell log said: 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100) 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage 2 (save at console:29) finished in 0.834 s 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at console:29, took 0.895912 s 15/08/23 07:38:24 main INFO LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory: /media/ephemeral0/s3/output- 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78, 4, -23, -103, 9, -104, -20, -8, 66, 126] 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__ committed. Anyone has experienced this before? Thanks!
Re: Error when saving a dataframe as ORC file
Hi Zhan, Thanks for the point. Yes I'm using a cluster with spark-1.4.0 and it looks like this is most likely the reason. I'll verify this again once the we make the upgrade. Best, los On Sun, Aug 23, 2015 at 1:25 PM, Zhan Zhang zzh...@hortonworks.com wrote: If you are using spark-1.4.0, probably it is caused by SPARK-8458 https://issues.apache.org/jira/browse/SPARK-8458 Thanks. Zhan Zhang On Aug 23, 2015, at 12:49 PM, lostrain A donotlikeworkingh...@gmail.com wrote: Ted, Thanks for the suggestions. Actually I tried both s3n and s3 and the result remains the same. On Sun, Aug 23, 2015 at 12:27 PM, Ted Yu yuzhih...@gmail.com wrote: In your case, I would specify fs.s3.awsAccessKeyId / fs.s3.awsSecretAccessKey since you use s3 protocol. On Sun, Aug 23, 2015 at 11:03 AM, lostrain A donotlikeworkingh...@gmail.com wrote: Hi Ted, Thanks for the reply. I tried setting both of the keyid and accesskey via sc.hadoopConfiguration.set(fs.s3n.awsAccessKeyId, ***) sc.hadoopConfiguration.set(fs.s3n.awsSecretAccessKey, **) However, the error still occurs for ORC format. If I change the format to JSON, although the error does not go, the JSON files can be saved successfully. On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu yuzhih...@gmail.com wrote: You may have seen this: http://search-hadoop.com/m/q3RTtdSyM52urAyI On Aug 23, 2015, at 1:01 AM, lostrain A donotlikeworkingh...@gmail.com wrote: Hi, I'm trying to save a simple dataframe to S3 in ORC format. The code is as follows: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._ val df=sc.parallelize(1 to 1000).toDF() df.write.format(orc).save(s3://logs/dummy) I ran the above code in spark-shell and only the _SUCCESS file was saved under the directory. The last part of the spark-shell log said: 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100) 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage 2 (save at console:29) finished in 0.834 s 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at console:29, took 0.895912 s 15/08/23 07:38:24 main INFO LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory: /media/ephemeral0/s3/output- 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78, 4, -23, -103, 9, -104, -20, -8, 66, 126] 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__ committed. Anyone has experienced this before? Thanks!
Re: SparkSQL concerning materials
Here's a longer version of that talk that I gave, which goes into more detail on the internals: http://www.slideshare.net/databricks/spark-sql-deep-dive-melbroune On Fri, Aug 21, 2015 at 8:28 AM, Sameer Farooqui same...@databricks.com wrote: Have you seen the Spark SQL paper?: https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf On Thu, Aug 20, 2015 at 11:35 PM, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: Hi, thanks for answers. I have read answers you provided, but I rather look for some materials on the internals. E.g how the optimizer works, how the query is translated into rdd operations etc. The API I am quite familiar with. A good starting point for me was: Spark DataFrames: Simple and Fast Analysis of Structured Data https://www.brighttalk.com/webcast/12891/166495?utm_campaign=child-community-webcasts-feedutm_content=Big+Data+and+Data+Managementutm_source=brighttalk-portalutm_medium=webutm_term= 2015-08-20 18:29 GMT+02:00 Dhaval Patel dhaval1...@gmail.com: Or if you're a python lover then this is a good place - https://spark.apache.org/docs/1.4.1/api/python/pyspark.sql.html# On Thu, Aug 20, 2015 at 10:58 AM, Ted Yu yuzhih...@gmail.com wrote: See also http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package Cheers On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif muhammadatif...@gmail.com wrote: Hi Dawid The best pace to get started is the Spark SQL Guide from Apache http://spark.apache.org/docs/latest/sql-programming-guide.html Regards Muhammad On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: Hi, I would like to dip into SparkSQL. Get to know better the architecture, good practices, some internals. Could you advise me some materials on this matter? Regards Dawid
Re: Spark Sql behaves strangely with tables with a lot of partitions
1 minute to discover 1000s of partitions -- yes, that is what I have observed. And I would assert that is very slow. On Sun, Aug 23, 2015 at 7:16 PM, Michael Armbrust mich...@databricks.com wrote: We should not be actually scanning all of the data of all of the partitions, but we do need to at least list all of the available directories so that we can apply your predicates to the actual values that are present when we are deciding which files need to be read in a given spark job. While this is a somewhat expensive operation, we do it in parallel and we cache this information when you access the same relation more than once. Can you provide a little more detail about how exactly you are accessing the parquet data (are you using sqlContext.read or creating persistent tables in the metastore?), and how long it is taking? It would also be good to know how many partitions we are talking about and how much data is in each. Finally, I'd like to see the stacktrace where it is hanging to make sure my above assertions are correct. We have several tables internally that have 1000s of partitions and while it takes ~1 minute initially to discover the metadata, after that we are able to query the data interactively. On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.com wrote: anybody has any suggestions? On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Is there a workaround without updating Hadoop? Would really appreciate if someone can explain what spark is trying to do here and what is an easy way to turn this off. Thanks all! On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: Did you try with hadoop version 2.7.1 .. It is known that s3a works really well with parquet which is available in 2.7. They fixed lot of issues related to metadata reading there... On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote: @Cheng, Hao : Physical plans show that it got stuck on scanning S3! (table is partitioned by date_prefix and hour) explain select count(*) from test_table where date_prefix='20150819' and hour='00'; TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)] TungstenExchange SinglePartition TungstenAggregate(key=[], value=[(count(1),mode=Partial,isDistinct=false)] Scan ParquetRelation[ .. about 1000 partition paths go here ] Why does spark have to scan all partitions when the query only concerns with 1 partitions? Doesn't it defeat the purpose of partitioning? Thanks! On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com wrote: I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and I couldn't find much information about it online. What does it mean exactly to disable it? Are there any negative consequences to disabling it? On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote: Can you make some more profiling? I am wondering if the driver is busy with scanning the HDFS / S3. Like jstack pid of driver process And also, it’s will be great if you can paste the physical plan for the simple query. *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 1:46 PM *To:* Cheng, Hao *Cc:* Philip Weaver; user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs trying to speed up spark sql with tables with a huge number of partitions, I've made sure that those CLs are included but it's still very slow On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote: Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false. BTW, which version are you using? Hao *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 12:16 PM *To:* Philip Weaver *Cc:* user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I guess the question is why does spark have to do partition discovery with all partitions when the query only needs to look at one partition? Is there a conf flag to turn this off? On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.com wrote: I've had the same problem. It turns out that Spark (specifically parquet) is very slow at partition discovery. It got better in 1.5 (not yet released), but was still unacceptably slow. Sadly, we ended up reading parquet files manually in Python (via C++) and had to abandon Spark SQL because of this problem. On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I did a simple experiment with Spark SQL. I created a partitioned parquet table with only one partition (date=20140701). A simple `select count(*) from table where date=20140701` would run very fast (0.1 seconds). However, as I
Re: Spark GraphaX
GrapX is a graph analytics engine rather than a graph database. It's typical use case is running large-scale graph algorithms like page rank , connected components, label propagation and so on. It can be an element of complex processing pipelines that involve other Spark components such as Data Frames, machine learning and Spark Streaming. If you need to store, update and query graph structures you might be better served looking at Neo4j or Titan. If you still need the analytics capability you can integrate Spark with the database. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-GraphaX-tp24408p24411.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Sql behaves strangely with tables with a lot of partitions
We should not be actually scanning all of the data of all of the partitions, but we do need to at least list all of the available directories so that we can apply your predicates to the actual values that are present when we are deciding which files need to be read in a given spark job. While this is a somewhat expensive operation, we do it in parallel and we cache this information when you access the same relation more than once. Can you provide a little more detail about how exactly you are accessing the parquet data (are you using sqlContext.read or creating persistent tables in the metastore?), and how long it is taking? It would also be good to know how many partitions we are talking about and how much data is in each. Finally, I'd like to see the stacktrace where it is hanging to make sure my above assertions are correct. We have several tables internally that have 1000s of partitions and while it takes ~1 minute initially to discover the metadata, after that we are able to query the data interactively. On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.com wrote: anybody has any suggestions? On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Is there a workaround without updating Hadoop? Would really appreciate if someone can explain what spark is trying to do here and what is an easy way to turn this off. Thanks all! On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: Did you try with hadoop version 2.7.1 .. It is known that s3a works really well with parquet which is available in 2.7. They fixed lot of issues related to metadata reading there... On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote: @Cheng, Hao : Physical plans show that it got stuck on scanning S3! (table is partitioned by date_prefix and hour) explain select count(*) from test_table where date_prefix='20150819' and hour='00'; TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)] TungstenExchange SinglePartition TungstenAggregate(key=[], value=[(count(1),mode=Partial,isDistinct=false)] Scan ParquetRelation[ .. about 1000 partition paths go here ] Why does spark have to scan all partitions when the query only concerns with 1 partitions? Doesn't it defeat the purpose of partitioning? Thanks! On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com wrote: I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and I couldn't find much information about it online. What does it mean exactly to disable it? Are there any negative consequences to disabling it? On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote: Can you make some more profiling? I am wondering if the driver is busy with scanning the HDFS / S3. Like jstack pid of driver process And also, it’s will be great if you can paste the physical plan for the simple query. *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 1:46 PM *To:* Cheng, Hao *Cc:* Philip Weaver; user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs trying to speed up spark sql with tables with a huge number of partitions, I've made sure that those CLs are included but it's still very slow On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote: Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false. BTW, which version are you using? Hao *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 12:16 PM *To:* Philip Weaver *Cc:* user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I guess the question is why does spark have to do partition discovery with all partitions when the query only needs to look at one partition? Is there a conf flag to turn this off? On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.com wrote: I've had the same problem. It turns out that Spark (specifically parquet) is very slow at partition discovery. It got better in 1.5 (not yet released), but was still unacceptably slow. Sadly, we ended up reading parquet files manually in Python (via C++) and had to abandon Spark SQL because of this problem. On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I did a simple experiment with Spark SQL. I created a partitioned parquet table with only one partition (date=20140701). A simple `select count(*) from table where date=20140701` would run very fast (0.1 seconds). However, as I added more partitions the query takes longer and longer. When I added about 10,000 partitions, the query took way too long. I feel like querying for a single partition should not be affected by having more
DataFrame rollup with alias?
Hello, I am new to Spark and just running some tests to get familiar with the APIs. When calling the rollup function on my DataFrame, I get different results when I alias the columns I am grouping on (see below for example data set). I was expecting alias function to only affect the column name. Why is it also affecting the rollup results? (I know I can rename my columns after the rollup call, using withColumnRenamed function, my question is just to get better understanding of alias function.) scala df.show ++--+-+ |Name| Game|Score| ++--+-+ | Bob|Game 1| 20| | Bob|Game 2| 30| | Lea|Game 1| 25| | Lea|Game 2| 30| | Ben|Game 1|5| | Ben|Game 3| 35| | Bob|Game 3| 15| ++--+-+ //rollup results as expected scala df.rollup(df(Name), df(Game)).sum().orderBy(Name, Game).show ++--+--+ |Name| Game|SUM(Score)| ++--+--+ |null| null| 160| | Ben| null|40| | Ben|Game 1| 5| | Ben|Game 3|35| | Bob| null|65| | Bob|Game 1|20| | Bob|Game 2|30| | Bob|Game 3|15| | Lea| null|55| | Lea|Game 1|25| | Lea|Game 2|30| ++--+--+ //rollup with aliases return strange results scala df.rollup(df(Name) as Player, df(Game) as Round).sum().orderBy(Player, Round).show +--+--+--+ |Player| Round|SUM(Score)| +--+--+--+ | Ben|Game 1| 5| | Ben|Game 1| 5| | Ben|Game 1| 5| | Ben|Game 3|35| | Ben|Game 3|35| | Ben|Game 3|35| | Bob|Game 1|20| | Bob|Game 1|20| | Bob|Game 1|20| | Bob|Game 2|30| | Bob|Game 2|30| | Bob|Game 2|30| | Bob|Game 3|15| | Bob|Game 3|15| | Bob|Game 3|15| | Lea|Game 1|25| | Lea|Game 1|25| | Lea|Game 1|25| | Lea|Game 2|30| | Lea|Game 2|30| +--+--+--+ Thanks in advance for your help, Isabelle