Re: How to parse multiple event types using Kafka

2015-08-23 Thread Cody Koeninger
Each spark partition will contain messages only from a single kafka
topcipartition.  Use hasOffsetRanges to tell which kafka topicpartition
it's from.  See the docs
http://spark.apache.org/docs/latest/streaming-kafka-integration.html

On Sun, Aug 23, 2015 at 10:56 AM, Spark Enthusiast sparkenthusi...@yahoo.in
 wrote:

 Folks,

 I use the following Streaming API from KafkaUtils :

 public JavaPairInputDStreamString, String inputDStream() {

 HashSetString topicsSet = new 
 HashSetString(Arrays.asList(topics.split(,)));
 HashMapString, String kafkaParams = new HashMapString, String();
 kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), 
 brokers);

 return KafkaUtils.createDirectStream(
 streamingContext,
 String.class,
 String.class,
 StringDecoder.class,
 StringDecoder.class,
 kafkaParams,
 topicsSet
 );

 }


 I catch the messages using :

 JavaDStreamString messages = inputDStream.map(new FunctionTuple2String, 
 String, String() {
 @Override
 public String call(Tuple2String, String tuple2) {
 return tuple2._2();
 }
 });


 My problem is, each of these Kafka Topics stream in different message types. 
 How do I distinguish messages that are of type1, messages that are of type2, 
 . ?


 I tried the following:


 private class ParseEventsT implements FunctionString, T {
 final ClassT parameterClass;

 private ParseEvents(ClassT parameterClass) {
 this.parameterClass = parameterClass;
 }

 public T call(String message) throws Exception {
 ObjectMapper mapper = new ObjectMapper();

 T parsedMessage = null;

 try {
 parsedMessage = mapper.readValue(message, 
 this.parameterClass);
 } catch (Exception e1) {
 logger.error(Ignoring Unknown Message %s, message);

 }
 return parsedMessage;
 }
 }

 JavaDStreamType1 type1Events = messages.map(new 
 ParseEventsType1(Type1.class));

 JavaDStreamType2 type2Events = messages.map(new 
 ParseEventsType2(Type2.class));

 JavaDStreamType3 type3Events = messages.map(new 
 ParseEventsType3(Type3.class));


 But this does not work because type1 catches type2 messages and ignores them. 
 Is there a clean way of handling this ?







Re: Spark streaming multi-tasking during I/O

2015-08-23 Thread Akhil Das
If you set concurrentJobs flag to 2, then it lets you run two jobs
parallel. It will be a bit hard for you predict the application behavior
with this flag thus debugging would be a headache.

Thanks
Best Regards

On Sun, Aug 23, 2015 at 10:36 AM, Sateesh Kavuri sateesh.kav...@gmail.com
wrote:

 Hi Akhil,

 Think of the scenario as running a piece of code in normal Java with
 multiple threads. Lets say there are 4 threads spawned by a Java process to
 handle reading from database, some processing and storing to database. In
 this process, while a thread is performing a database I/O, the CPU could
 allow another thread to perform the processing, thus efficiently using the
 resources.

 Incase of Spark, while a node executor is running the same read from DB
 = process data = store to DB, during the read from DB and store to
 DB phase, the CPU is not given to other requests in queue, since the
 executor will allocate the resources completely to the current ongoing
 request.

 Does not flag spark.streaming.concurrentJobs enable this kind of scenario
 or is there any other way to achieve what I am looking for

 Thanks,
 Sateesh

 On Sat, Aug 22, 2015 at 7:26 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hmm for a singl core VM you will have to run it in local mode(specifying
 master= local[4]). The flag is available in all the versions of spark i
 guess.
 On Aug 22, 2015 5:04 AM, Sateesh Kavuri sateesh.kav...@gmail.com
 wrote:

 Thanks Akhil. Does this mean that the executor running in the VM can
 spawn two concurrent jobs on the same core? If this is the case, this is
 what we are looking for. Also, which version of Spark is this flag in?

 Thanks,
 Sateesh

 On Sat, Aug 22, 2015 at 1:44 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 You can look at the spark.streaming.concurrentJobs by default it runs a
 single job. If set it to 2 then it can run 2 jobs parallely. Its an
 experimental flag, but go ahead and give it a try.
 On Aug 21, 2015 3:36 AM, Sateesh Kavuri sateesh.kav...@gmail.com
 wrote:

 Hi,

 My scenario goes like this:
 I have an algorithm running in Spark streaming mode on a 4 core
 virtual machine. Majority of the time, the algorithm does disk I/O and
 database I/O. Question is, during the I/O, where the CPU is not
 considerably loaded, is it possible to run any other task/thread so as to
 efficiently utilize the CPU?

 Note that one DStream of the algorithm runs completely on a single CPU

 Thank you,
 Sateesh






Re: Error when saving a dataframe as ORC file

2015-08-23 Thread lostrain A
Hi Ted,
  Thanks for the reply. I tried setting both of the keyid and accesskey via

sc.hadoopConfiguration.set(fs.s3n.awsAccessKeyId, ***)
 sc.hadoopConfiguration.set(fs.s3n.awsSecretAccessKey, **)


However, the error still occurs for ORC format.

If I change the format to JSON, although the error does not go, the JSON
files can be saved successfully.




On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu yuzhih...@gmail.com wrote:

 You may have seen this:
 http://search-hadoop.com/m/q3RTtdSyM52urAyI



 On Aug 23, 2015, at 1:01 AM, lostrain A donotlikeworkingh...@gmail.com
 wrote:

 Hi,
   I'm trying to save a simple dataframe to S3 in ORC format. The code is
 as follows:


  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
   import sqlContext.implicits._
   val df=sc.parallelize(1 to 1000).toDF()
   df.write.format(orc).save(s3://logs/dummy)


 I ran the above code in spark-shell and only the _SUCCESS file was saved
 under the directory.
 The last part of the spark-shell log said:

 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task
 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100)



 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage
 2 (save at console:29) finished in 0.834 s



 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed
 TaskSet 2.0, whose tasks have all completed, from pool



 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at
 console:29, took 0.895912 s



 15/08/23 07:38:24 main INFO
 LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory:
 /media/ephemeral0/s3/output-



 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for
 dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78,
  4, -23, -103, 9, -104, -20, -8, 66, 126]



 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__
 committed.


 Anyone has experienced this before?
 Thanks!





Re: Error when saving a dataframe as ORC file

2015-08-23 Thread lostrain A
Ted,
  Thanks for the suggestions. Actually I tried both s3n and s3 and the
result remains the same.


On Sun, Aug 23, 2015 at 12:27 PM, Ted Yu yuzhih...@gmail.com wrote:

 In your case, I would specify fs.s3.awsAccessKeyId /
 fs.s3.awsSecretAccessKey since you use s3 protocol.

 On Sun, Aug 23, 2015 at 11:03 AM, lostrain A 
 donotlikeworkingh...@gmail.com wrote:

 Hi Ted,
   Thanks for the reply. I tried setting both of the keyid and accesskey
 via

 sc.hadoopConfiguration.set(fs.s3n.awsAccessKeyId, ***)
 sc.hadoopConfiguration.set(fs.s3n.awsSecretAccessKey, **)


 However, the error still occurs for ORC format.

 If I change the format to JSON, although the error does not go, the JSON
 files can be saved successfully.




 On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu yuzhih...@gmail.com wrote:

 You may have seen this:
 http://search-hadoop.com/m/q3RTtdSyM52urAyI



 On Aug 23, 2015, at 1:01 AM, lostrain A donotlikeworkingh...@gmail.com
 wrote:

 Hi,
   I'm trying to save a simple dataframe to S3 in ORC format. The code is
 as follows:


  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
   import sqlContext.implicits._
   val df=sc.parallelize(1 to 1000).toDF()
   df.write.format(orc).save(s3://logs/dummy)


 I ran the above code in spark-shell and only the _SUCCESS file was saved
 under the directory.
 The last part of the spark-shell log said:

 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished
 task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal
 (100/100)



 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler:
 ResultStage 2 (save at console:29) finished in 0.834 s



 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed
 TaskSet 2.0, whose tasks have all completed, from pool



 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at
 console:29, took 0.895912 s



 15/08/23 07:38:24 main INFO
 LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory:
 /media/ephemeral0/s3/output-



 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for
 dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78,
  4, -23, -103, 9, -104, -20, -8, 66, 126]



 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__
 committed.


 Anyone has experienced this before?
 Thanks!







B2i Healthcare Powered by Spark addition

2015-08-23 Thread Brandon Ulrich
Another addition to the Powered by Spark page:

B2i Healthcare (http://b2i.sg) uses Spark in healthcare analytics with
medical ontologies like SNOMED CT. Our Snow Owl MQ (
http://b2i.sg/snow-owl-mq) product relies on the Spark ecosystem to analyze
~1 billion health records with over 70 healthcare terminologies. An online
demo is available at https://mq.b2i.sg. Use cases include:

   - Creating cohorts to group patients with similar demographic traits,
   drug exposures, clinical findings, procedures, and observations (Spark
   Core, Spark SQL, GraphX)
   - Inspecting patient records to identify trends and correlations (SparkR)
   - Statistical analysis of patient cohorts to test and verify clinical
   hypotheses (MLlib)
   - Identification of potential adverse drug events and interactions with
   pharmacovigilance signal detection (Streaming)

Thanks,
Brandon


Re: How to set environment of worker applications

2015-08-23 Thread Sathish Kumaran Vairavelu
spark-env.sh works for me in Spark 1.4 but not
spark.executor.extraJavaOptions.

On Sun, Aug 23, 2015 at 11:27 AM Raghavendra Pandey 
raghavendra.pan...@gmail.com wrote:

 I think the only way to pass on environment variables to worker node is to
 write it in spark-env.sh file on each worker node.

 On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Check for spark.driver.extraJavaOptions and
 spark.executor.extraJavaOptions in the following article. I think you can
 use -D to pass system vars:

 spark.apache.org/docs/latest/configuration.html#runtime-environment
 Hi,

 I am starting a spark streaming job in standalone mode with spark-submit.

 Is there a way to make the UNIX environment variables with which
 spark-submit is started available to the processes started on the worker
 nodes?

 Jan
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





How to parse multiple event types using Kafka

2015-08-23 Thread Spark Enthusiast
Folks,
I use the following Streaming API from KafkaUtils :
public JavaPairInputDStreamString, String inputDStream() {

HashSetString topicsSet = new 
HashSetString(Arrays.asList(topics.split(,)));
HashMapString, String kafkaParams = new HashMapString, String();
kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), brokers);

return KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);

}

I catch the messages using :JavaDStreamString messages = inputDStream.map(new 
FunctionTuple2String, String, String() {
@Override
public String call(Tuple2String, String tuple2) {
return tuple2._2();
}
});
My problem is, each of these Kafka Topics stream in different message types. 
How do I distinguish messages that are of type1, messages that are of type2, 
. ?
I tried the following:
private class ParseEventsT implements FunctionString, T {
final ClassT parameterClass;

private ParseEvents(ClassT parameterClass) {
this.parameterClass = parameterClass;
}

public T call(String message) throws Exception {
ObjectMapper mapper = new ObjectMapper();

T parsedMessage = null;

try {
parsedMessage = mapper.readValue(message, this.parameterClass);
} catch (Exception e1) {
logger.error(Ignoring Unknown Message %s, message);
  
}
return parsedMessage;
}
}JavaDStreamType1 type1Events = messages.map(new 
ParseEventsType1(Type1.class));JavaDStreamType2 type2Events = 
messages.map(new ParseEventsType2(Type2.class));JavaDStreamType3 
type3Events = messages.map(new ParseEventsType3(Type3.class));
But this does not work because type1 catches type2 messages and ignores them. 
Is there a clean way of handling this ?




Spark YARN executors are not launching when using +UseG1GC

2015-08-23 Thread unk1102
Hi I am hitting issue of long GC pauses in my Spark job and because of it
YARN is killing executors one by one and Spark job becomes slower and
slower. I came across this article where they mentioned about using G1GC I
tried to use the same command but something seems wrong

https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html

./spark-submit --class com.xyz.MySpark --conf
spark.executor.extraJavaOptions=-XX:MaxPermSize=512M -XX:+UseG1GC
-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -Xms25g -Xmx25g
-XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThread=20
--driver-java-options -XX:MaxPermSize=512m --driver-memory 3g --master
yarn-client --executor-memory 25G --executor-cores 8 --num-executors 12 
/home/myuser/myspark-1.0.jar

First it said you cant use Xms/Xmx for executor so I removed it but
executors never gets launched if I use above command please guide. Thanks in
advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-YARN-executors-are-not-launching-when-using-UseG1GC-tp24407.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



is there a 'knack' to docker and mesos?

2015-08-23 Thread Dick Davies
Really excited to try out the new Docker executor support on 1.4.1, I'm
making progress but feel like I'm missing something.

(versions:

spark-1.4.1-hadoop2.6 - not using hadoop yet
mac os x yosemite java 8 spark-shell
mesos 0.22.1  : 2 slaves, 1 master + zk , all on centos 6.x
docker 1.8.x
)

I wanted to use a generic docker image with e.g. Java 8 in it, and then
deploy the 1.4.1 distro into it. The docs seem to indicate that's supported.


Mesos and Docker pull the requested docker images ( 'docker pull java'
essentially)
and extract the spark.executor.uri distro correctly. It seems to fall
over at cd'ing of
all places:

cd: can't cd to spark-1*

this causes the task to fail and eventually have spark blacklist the slave.

Is this just because 'spark-1*' matches both the tarball and the directory?

full mesos stderr looks like:



I0823 19:13:25.608206  3069 fetcher.cpp:214] Fetching URI
'http://d3kbcqa49mib13.cloudfront.net/spark-1.4.1-bin-hadoop2.6.tgz'
I0823 19:13:25.608582  3069 fetcher.cpp:125] Fetching URI
'http://d3kbcqa49mib13.cloudfront.net/spark-1.4.1-bin-hadoop2.6.tgz'
with os::net
I0823 19:13:25.608620  3069 fetcher.cpp:135] Downloading
'http://d3kbcqa49mib13.cloudfront.net/spark-1.4.1-bin-hadoop2.6.tgz'
to 
'/var/mesos/slaves/20150823-110659-1862270986-5050-3230-S1/frameworks/20150823-191138-1862270986-5050-3768-/executors/0/runs/a5a13cd4-013a-4ebc-8ef7-eb9c33358429/spark-1.4.1-bin-hadoop2.6.tgz'
I0823 19:14:37.765060  3069 fetcher.cpp:78] Extracted resource
'/var/mesos/slaves/20150823-110659-1862270986-5050-3230-S1/frameworks/20150823-191138-1862270986-5050-3768-/executors/0/runs/a5a13cd4-013a-4ebc-8ef7-eb9c33358429/spark-1.4.1-bin-hadoop2.6.tgz'
into 
'/var/mesos/slaves/20150823-110659-1862270986-5050-3230-S1/frameworks/20150823-191138-1862270986-5050-3768-/executors/0/runs/a5a13cd4-013a-4ebc-8ef7-eb9c33358429'
/bin/sh: 1: cd: can't cd to spark-1*
/bin/sh: 1: ./bin/spark-class: not found
I0823 19:14:38.365190  3138 exec.cpp:132] Version: 0.22.1
I0823 19:14:38.369495  3156 exec.cpp:206] Executor registered on slave
20150823-110659-1862270986-5050-3230-S1

-

and the spark-shell says:

15/08/23 20:15:51 INFO CoarseMesosSchedulerBackend: Mesos task 3 is
now TASK_RUNNING
15/08/23 20:15:51 INFO CoarseMesosSchedulerBackend: Mesos task 3 is
now TASK_FAILED
15/08/23 20:15:51 INFO CoarseMesosSchedulerBackend: Blacklisting Mesos
slave value: 20150823-110659-1862270986-5050-3230-S0
 due to too many failures; is Spark installed on it?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Error when saving a dataframe as ORC file

2015-08-23 Thread Ted Yu
In your case, I would specify fs.s3.awsAccessKeyId /
fs.s3.awsSecretAccessKey since you use s3 protocol.

On Sun, Aug 23, 2015 at 11:03 AM, lostrain A donotlikeworkingh...@gmail.com
 wrote:

 Hi Ted,
   Thanks for the reply. I tried setting both of the keyid and accesskey via

 sc.hadoopConfiguration.set(fs.s3n.awsAccessKeyId, ***)
 sc.hadoopConfiguration.set(fs.s3n.awsSecretAccessKey, **)


 However, the error still occurs for ORC format.

 If I change the format to JSON, although the error does not go, the JSON
 files can be saved successfully.




 On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu yuzhih...@gmail.com wrote:

 You may have seen this:
 http://search-hadoop.com/m/q3RTtdSyM52urAyI



 On Aug 23, 2015, at 1:01 AM, lostrain A donotlikeworkingh...@gmail.com
 wrote:

 Hi,
   I'm trying to save a simple dataframe to S3 in ORC format. The code is
 as follows:


  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
   import sqlContext.implicits._
   val df=sc.parallelize(1 to 1000).toDF()
   df.write.format(orc).save(s3://logs/dummy)


 I ran the above code in spark-shell and only the _SUCCESS file was saved
 under the directory.
 The last part of the spark-shell log said:

 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task
 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100)



 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler:
 ResultStage 2 (save at console:29) finished in 0.834 s



 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed
 TaskSet 2.0, whose tasks have all completed, from pool



 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at
 console:29, took 0.895912 s



 15/08/23 07:38:24 main INFO
 LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory:
 /media/ephemeral0/s3/output-



 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for
 dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78,
  4, -23, -103, 9, -104, -20, -8, 66, 126]



 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__
 committed.


 Anyone has experienced this before?
 Thanks!






Re: Spark Mesos Dispatcher

2015-08-23 Thread bcajes
I'm currently having the same issues.  The documentation for Mesos dispatcher
is sparse.  I'll also add that I'm able to see the framework running in the
mesos and spark driver UIs, but when viewing the spark job ui on a slave, no
job is seen.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Mesos-Dispatcher-tp24238p24404.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to set environment of worker applications

2015-08-23 Thread Raghavendra Pandey
I think the only way to pass on environment variables to worker node is to
write it in spark-env.sh file on each worker node.

On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 Check for spark.driver.extraJavaOptions and
 spark.executor.extraJavaOptions in the following article. I think you can
 use -D to pass system vars:

 spark.apache.org/docs/latest/configuration.html#runtime-environment
 Hi,

 I am starting a spark streaming job in standalone mode with spark-submit.

 Is there a way to make the UNIX environment variables with which
 spark-submit is started available to the processes started on the worker
 nodes?

 Jan
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Error when saving a dataframe as ORC file

2015-08-23 Thread lostrain A
Hi,
  I'm trying to save a simple dataframe to S3 in ORC format. The code is as
follows:


 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
   import sqlContext.implicits._
   val df=sc.parallelize(1 to 1000).toDF()
   df.write.format(orc).save(s3://logs/dummy)


I ran the above code in spark-shell and only the _SUCCESS file was saved
under the directory.
The last part of the spark-shell log said:

15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task
 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100)



 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage
 2 (save at console:29) finished in 0.834 s



 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet
 2.0, whose tasks have all completed, from pool



 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at
 console:29, took 0.895912 s



 15/08/23 07:38:24 main INFO
 LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory:
 /media/ephemeral0/s3/output-



 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for
 dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78,
  4, -23, -103, 9, -104, -20, -8, 66, 126]



 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__
 committed.


Anyone has experienced this before?
Thanks!


Re: Error when saving a dataframe as ORC file

2015-08-23 Thread Ted Yu
You may have seen this:
http://search-hadoop.com/m/q3RTtdSyM52urAyI



 On Aug 23, 2015, at 1:01 AM, lostrain A donotlikeworkingh...@gmail.com 
 wrote:
 
 Hi,
   I'm trying to save a simple dataframe to S3 in ORC format. The code is as 
 follows:
 
 
  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
   import sqlContext.implicits._
   val df=sc.parallelize(1 to 1000).toDF()
   df.write.format(orc).save(s3://logs/dummy)
 
 I ran the above code in spark-shell and only the _SUCCESS file was saved 
 under the directory.
 The last part of the spark-shell log said:
 
 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task 
 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100)
  
 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage 2 
 (save at console:29) finished in 0.834 s
  
 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet 
 2.0, whose tasks have all completed, from pool
  
 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at 
 console:29, took 0.895912 s
  
 15/08/23 07:38:24 main INFO 
 LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory: 
 /media/ephemeral0/s3/output-
  
 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for dummy/_SUCCESS 
 is [-44, 29, -128, -39, -113, 0, -78,
  4, -23, -103, 9, -104, -20, -8, 66, 126]
  
 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__ 
 committed.
 
 Anyone has experienced this before?
 Thanks!
  


Re: How to set environment of worker applications

2015-08-23 Thread Hemant Bhanawat
Check for spark.driver.extraJavaOptions and spark.executor.extraJavaOptions
in the following article. I think you can use -D to pass system vars:

spark.apache.org/docs/latest/configuration.html#runtime-environment
Hi,

I am starting a spark streaming job in standalone mode with spark-submit.

Is there a way to make the UNIX environment variables with which
spark-submit is started available to the processes started on the worker
nodes?

Jan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


How to set environment of worker applications

2015-08-23 Thread Jan Algermissen
Hi,

I am starting a spark streaming job in standalone mode with spark-submit.

Is there a way to make the UNIX environment variables with which spark-submit 
is started available to the processes started on the worker nodes?

Jan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Mesos Dispatcher

2015-08-23 Thread Timothy Chen
Hi Bcjaes,

Sorry I didn't see the previous thread so not sure what issues you are running 
into.

In cluster mode the driver logs and results are all available through the Mesos 
UI, you need to look at terminated frameworks if it's a job that's already 
finished.

I'll try to add more docs as we are still completing some other features around 
cluster mode on Mesos.

Tim


 On Aug 23, 2015, at 7:22 AM, bcajes brian.ca...@gmail.com wrote:
 
 I'm currently having the same issues.  The documentation for Mesos dispatcher
 is sparse.  I'll also add that I'm able to see the framework running in the
 mesos and spark driver UIs, but when viewing the spark job ui on a slave, no
 job is seen.
 
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Mesos-Dispatcher-tp24238p24404.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-23 Thread Jerrick Hoang
anybody has any suggestions?

On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com
wrote:

 Is there a workaround without updating Hadoop? Would really appreciate if
 someone can explain what spark is trying to do here and what is an easy way
 to turn this off. Thanks all!

 On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 Did you try with hadoop version 2.7.1 .. It is known that s3a works
 really well with parquet which is available in 2.7. They fixed lot of
 issues related to metadata reading there...
 On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote:

 @Cheng, Hao : Physical plans show that it got stuck on scanning S3!

 (table is partitioned by date_prefix and hour)
 explain select count(*) from test_table where date_prefix='20150819' and
 hour='00';

 TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]
  TungstenExchange SinglePartition
   TungstenAggregate(key=[],
 value=[(count(1),mode=Partial,isDistinct=false)]
Scan ParquetRelation[ .. about 1000 partition paths go here ]

 Why does spark have to scan all partitions when the query only concerns
 with 1 partitions? Doesn't it defeat the purpose of partitioning?

 Thanks!

 On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before,
 and I couldn't find much information about it online. What does it mean
 exactly to disable it? Are there any negative consequences to disabling it?

 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com
 wrote:

 Can you make some more profiling? I am wondering if the driver is busy
 with scanning the HDFS / S3.

 Like jstack pid of driver process



 And also, it’s will be great if you can paste the physical plan for
 the simple query.



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 1:46 PM
 *To:* Cheng, Hao
 *Cc:* Philip Weaver; user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I cloned from TOT after 1.5.0 cut off. I noticed there were a couple
 of CLs trying to speed up spark sql with tables with a huge number of
 partitions, I've made sure that those CLs are included but it's still very
 slow



 On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com
 wrote:

 Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled
 to false.



 BTW, which version are you using?



 Hao



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 12:16 PM
 *To:* Philip Weaver
 *Cc:* user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I guess the question is why does spark have to do partition discovery
 with all partitions when the query only needs to look at one partition? Is
 there a conf flag to turn this off?



 On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver 
 philip.wea...@gmail.com wrote:

 I've had the same problem. It turns out that Spark (specifically
 parquet) is very slow at partition discovery. It got better in 1.5 (not 
 yet
 released), but was still unacceptably slow. Sadly, we ended up reading
 parquet files manually in Python (via C++) and had to abandon Spark SQL
 because of this problem.



 On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Hi all,



 I did a simple experiment with Spark SQL. I created a partitioned
 parquet table with only one partition (date=20140701). A simple `select
 count(*) from table where date=20140701` would run very fast (0.1 
 seconds).
 However, as I added more partitions the query takes longer and longer. 
 When
 I added about 10,000 partitions, the query took way too long. I feel like
 querying for a single partition should not be affected by having more
 partitions. Is this a known behaviour? What does spark try to do here?



 Thanks,

 Jerrick













Re: Error when saving a dataframe as ORC file

2015-08-23 Thread Ted Yu
SPARK-8458 is in 1.4.1 release.

You can upgrade to 1.4.1 or, wait for the upcoming 1.5.0 release.

On Sun, Aug 23, 2015 at 2:05 PM, lostrain A donotlikeworkingh...@gmail.com
wrote:

 Hi Zhan,
   Thanks for the point. Yes I'm using a cluster with spark-1.4.0 and it
 looks like this is most likely the reason. I'll verify this again once the
 we make the upgrade.

 Best,
 los

 On Sun, Aug 23, 2015 at 1:25 PM, Zhan Zhang zzh...@hortonworks.com
 wrote:

 If you are using spark-1.4.0, probably it is caused by SPARK-8458
 https://issues.apache.org/jira/browse/SPARK-8458

 Thanks.

 Zhan Zhang

 On Aug 23, 2015, at 12:49 PM, lostrain A donotlikeworkingh...@gmail.com
 wrote:

 Ted,
   Thanks for the suggestions. Actually I tried both s3n and s3 and the
 result remains the same.


 On Sun, Aug 23, 2015 at 12:27 PM, Ted Yu yuzhih...@gmail.com wrote:

 In your case, I would specify fs.s3.awsAccessKeyId /
 fs.s3.awsSecretAccessKey since you use s3 protocol.

 On Sun, Aug 23, 2015 at 11:03 AM, lostrain A 
 donotlikeworkingh...@gmail.com wrote:

 Hi Ted,
   Thanks for the reply. I tried setting both of the keyid and accesskey
 via

 sc.hadoopConfiguration.set(fs.s3n.awsAccessKeyId, ***)
 sc.hadoopConfiguration.set(fs.s3n.awsSecretAccessKey, **)


 However, the error still occurs for ORC format.

 If I change the format to JSON, although the error does not go, the
 JSON files can be saved successfully.




 On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu yuzhih...@gmail.com wrote:

 You may have seen this:
 http://search-hadoop.com/m/q3RTtdSyM52urAyI



 On Aug 23, 2015, at 1:01 AM, lostrain A 
 donotlikeworkingh...@gmail.com wrote:

 Hi,
   I'm trying to save a simple dataframe to S3 in ORC format. The code
 is as follows:


  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
   import sqlContext.implicits._
   val df=sc.parallelize(1 to 1000).toDF()
   df.write.format(orc).save(s3://logs/dummy)


 I ran the above code in spark-shell and only the _SUCCESS file was
 saved under the directory.
 The last part of the spark-shell log said:

 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished
 task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal
 (100/100)



 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler:
 ResultStage 2 (save at console:29) finished in 0.834 s



 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed
 TaskSet 2.0, whose tasks have all completed, from pool



 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at
 console:29, took 0.895912 s



 15/08/23 07:38:24 main INFO
 LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory:
 /media/ephemeral0/s3/output-



 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for
 dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78,
  4, -23, -103, 9, -104, -20, -8, 66, 126]



 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__
 committed.


 Anyone has experienced this before?
 Thanks!










Re: Error when saving a dataframe as ORC file

2015-08-23 Thread Zhan Zhang
If you are using spark-1.4.0, probably it is caused by 
SPARK-8458https://issues.apache.org/jira/browse/SPARK-8458

Thanks.

Zhan Zhang

On Aug 23, 2015, at 12:49 PM, lostrain A 
donotlikeworkingh...@gmail.commailto:donotlikeworkingh...@gmail.com wrote:

Ted,
  Thanks for the suggestions. Actually I tried both s3n and s3 and the result 
remains the same.


On Sun, Aug 23, 2015 at 12:27 PM, Ted Yu 
yuzhih...@gmail.commailto:yuzhih...@gmail.com wrote:
In your case, I would specify fs.s3.awsAccessKeyId / 
fs.s3.awsSecretAccessKey since you use s3 protocol.

On Sun, Aug 23, 2015 at 11:03 AM, lostrain A 
donotlikeworkingh...@gmail.commailto:donotlikeworkingh...@gmail.com wrote:
Hi Ted,
  Thanks for the reply. I tried setting both of the keyid and accesskey via

sc.hadoopConfiguration.set(fs.s3n.awsAccessKeyId, ***)
sc.hadoopConfiguration.set(fs.s3n.awsSecretAccessKey, **)

However, the error still occurs for ORC format.

If I change the format to JSON, although the error does not go, the JSON files 
can be saved successfully.




On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu 
yuzhih...@gmail.commailto:yuzhih...@gmail.com wrote:
You may have seen this:
http://search-hadoop.com/m/q3RTtdSyM52urAyI



On Aug 23, 2015, at 1:01 AM, lostrain A 
donotlikeworkingh...@gmail.commailto:donotlikeworkingh...@gmail.com wrote:

Hi,
  I'm trying to save a simple dataframe to S3 in ORC format. The code is as 
follows:


 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
  import sqlContext.implicits._
  val df=sc.parallelize(1 to 1000).toDF()
  df.write.format(orc).save(s3://logs/dummy)

I ran the above code in spark-shell and only the _SUCCESS file was saved under 
the directory.
The last part of the spark-shell log said:

15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task 95.0 
in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100)

15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage 2 
(save at console:29) finished in 0.834 s

15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet 2.0, 
whose tasks have all completed, from pool

15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at console:29, 
took 0.895912 s

15/08/23 07:38:24 main INFO LocalDirAllocator$AllocatorPerContext$DirSelector: 
Returning directory: /media/ephemeral0/s3/output-

15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for dummy/_SUCCESS is 
[-44, 29, -128, -39, -113, 0, -78,
 4, -23, -103, 9, -104, -20, -8, 66, 126]

15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__ committed.

Anyone has experienced this before?
Thanks!







Re: Error when saving a dataframe as ORC file

2015-08-23 Thread lostrain A
Hi Zhan,
  Thanks for the point. Yes I'm using a cluster with spark-1.4.0 and it
looks like this is most likely the reason. I'll verify this again once the
we make the upgrade.

Best,
los

On Sun, Aug 23, 2015 at 1:25 PM, Zhan Zhang zzh...@hortonworks.com wrote:

 If you are using spark-1.4.0, probably it is caused by SPARK-8458
 https://issues.apache.org/jira/browse/SPARK-8458

 Thanks.

 Zhan Zhang

 On Aug 23, 2015, at 12:49 PM, lostrain A donotlikeworkingh...@gmail.com
 wrote:

 Ted,
   Thanks for the suggestions. Actually I tried both s3n and s3 and the
 result remains the same.


 On Sun, Aug 23, 2015 at 12:27 PM, Ted Yu yuzhih...@gmail.com wrote:

 In your case, I would specify fs.s3.awsAccessKeyId /
 fs.s3.awsSecretAccessKey since you use s3 protocol.

 On Sun, Aug 23, 2015 at 11:03 AM, lostrain A 
 donotlikeworkingh...@gmail.com wrote:

 Hi Ted,
   Thanks for the reply. I tried setting both of the keyid and accesskey
 via

 sc.hadoopConfiguration.set(fs.s3n.awsAccessKeyId, ***)
 sc.hadoopConfiguration.set(fs.s3n.awsSecretAccessKey, **)


 However, the error still occurs for ORC format.

 If I change the format to JSON, although the error does not go, the JSON
 files can be saved successfully.




 On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu yuzhih...@gmail.com wrote:

 You may have seen this:
 http://search-hadoop.com/m/q3RTtdSyM52urAyI



 On Aug 23, 2015, at 1:01 AM, lostrain A donotlikeworkingh...@gmail.com
 wrote:

 Hi,
   I'm trying to save a simple dataframe to S3 in ORC format. The code
 is as follows:


  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
   import sqlContext.implicits._
   val df=sc.parallelize(1 to 1000).toDF()
   df.write.format(orc).save(s3://logs/dummy)


 I ran the above code in spark-shell and only the _SUCCESS file was
 saved under the directory.
 The last part of the spark-shell log said:

 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished
 task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal
 (100/100)



 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler:
 ResultStage 2 (save at console:29) finished in 0.834 s



 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed
 TaskSet 2.0, whose tasks have all completed, from pool



 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at
 console:29, took 0.895912 s



 15/08/23 07:38:24 main INFO
 LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory:
 /media/ephemeral0/s3/output-



 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for
 dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78,
  4, -23, -103, 9, -104, -20, -8, 66, 126]



 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__
 committed.


 Anyone has experienced this before?
 Thanks!









Re: SparkSQL concerning materials

2015-08-23 Thread Michael Armbrust
Here's a longer version of that talk that I gave, which goes into more
detail on the internals:
http://www.slideshare.net/databricks/spark-sql-deep-dive-melbroune

On Fri, Aug 21, 2015 at 8:28 AM, Sameer Farooqui same...@databricks.com
wrote:

 Have you seen the Spark SQL paper?:
 https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf

 On Thu, Aug 20, 2015 at 11:35 PM, Dawid Wysakowicz 
 wysakowicz.da...@gmail.com wrote:

 Hi,

 thanks for answers. I have read answers you provided, but I rather look
 for some materials on the internals. E.g how the optimizer works, how the
 query is translated into rdd operations etc. The API I am quite familiar
 with.
 A good starting point for me was: Spark DataFrames: Simple and Fast
 Analysis of Structured Data
 https://www.brighttalk.com/webcast/12891/166495?utm_campaign=child-community-webcasts-feedutm_content=Big+Data+and+Data+Managementutm_source=brighttalk-portalutm_medium=webutm_term=

 2015-08-20 18:29 GMT+02:00 Dhaval Patel dhaval1...@gmail.com:

 Or if you're a python lover then this is a good place -
 https://spark.apache.org/docs/1.4.1/api/python/pyspark.sql.html#



 On Thu, Aug 20, 2015 at 10:58 AM, Ted Yu yuzhih...@gmail.com wrote:

 See also
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package

 Cheers

 On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif 
 muhammadatif...@gmail.com wrote:

 Hi Dawid

 The best pace to get started is the Spark SQL Guide from Apache
 http://spark.apache.org/docs/latest/sql-programming-guide.html

 Regards
 Muhammad

 On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz 
 wysakowicz.da...@gmail.com wrote:

 Hi,

 I would like to dip into SparkSQL. Get to know better the
 architecture, good practices, some internals. Could you advise me some
 materials on this matter?

 Regards
 Dawid









Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-23 Thread Philip Weaver
1 minute to discover 1000s of partitions -- yes, that is what I have
observed. And I would assert that is very slow.

On Sun, Aug 23, 2015 at 7:16 PM, Michael Armbrust mich...@databricks.com
wrote:

 We should not be actually scanning all of the data of all of the
 partitions, but we do need to at least list all of the available
 directories so that we can apply your predicates to the actual values that
 are present when we are deciding which files need to be read in a given
 spark job.  While this is a somewhat expensive operation, we do it in
 parallel and we cache this information when you access the same relation
 more than once.

 Can you provide a little more detail about how exactly you are accessing
 the parquet data (are you using sqlContext.read or creating persistent
 tables in the metastore?), and how long it is taking?  It would also be
 good to know how many partitions we are talking about and how much data is
 in each.  Finally, I'd like to see the stacktrace where it is hanging to
 make sure my above assertions are correct.

 We have several tables internally that have 1000s of partitions and while
 it takes ~1 minute initially to discover the metadata, after that we are
 able to query the data interactively.



 On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 anybody has any suggestions?

 On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Is there a workaround without updating Hadoop? Would really appreciate
 if someone can explain what spark is trying to do here and what is an easy
 way to turn this off. Thanks all!

 On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 Did you try with hadoop version 2.7.1 .. It is known that s3a works
 really well with parquet which is available in 2.7. They fixed lot of
 issues related to metadata reading there...
 On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 @Cheng, Hao : Physical plans show that it got stuck on scanning S3!

 (table is partitioned by date_prefix and hour)
 explain select count(*) from test_table where date_prefix='20150819'
 and hour='00';

 TungstenAggregate(key=[],
 value=[(count(1),mode=Final,isDistinct=false)]
  TungstenExchange SinglePartition
   TungstenAggregate(key=[],
 value=[(count(1),mode=Partial,isDistinct=false)]
Scan ParquetRelation[ .. about 1000 partition paths go here ]

 Why does spark have to scan all partitions when the query only
 concerns with 1 partitions? Doesn't it defeat the purpose of partitioning?

 Thanks!

 On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver 
 philip.wea...@gmail.com wrote:

 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled
 before, and I couldn't find much information about it online. What does 
 it
 mean exactly to disable it? Are there any negative consequences to
 disabling it?

 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com
 wrote:

 Can you make some more profiling? I am wondering if the driver is
 busy with scanning the HDFS / S3.

 Like jstack pid of driver process



 And also, it’s will be great if you can paste the physical plan for
 the simple query.



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 1:46 PM
 *To:* Cheng, Hao
 *Cc:* Philip Weaver; user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot
 of partitions



 I cloned from TOT after 1.5.0 cut off. I noticed there were a couple
 of CLs trying to speed up spark sql with tables with a huge number of
 partitions, I've made sure that those CLs are included but it's still 
 very
 slow



 On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com
 wrote:

 Yes, you can try set the
 spark.sql.sources.partitionDiscovery.enabled to false.



 BTW, which version are you using?



 Hao



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 12:16 PM
 *To:* Philip Weaver
 *Cc:* user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot
 of partitions



 I guess the question is why does spark have to do partition
 discovery with all partitions when the query only needs to look at one
 partition? Is there a conf flag to turn this off?



 On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver 
 philip.wea...@gmail.com wrote:

 I've had the same problem. It turns out that Spark (specifically
 parquet) is very slow at partition discovery. It got better in 1.5 (not 
 yet
 released), but was still unacceptably slow. Sadly, we ended up reading
 parquet files manually in Python (via C++) and had to abandon Spark SQL
 because of this problem.



 On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang 
 jerrickho...@gmail.com wrote:

 Hi all,



 I did a simple experiment with Spark SQL. I created a partitioned
 parquet table with only one partition (date=20140701). A simple `select
 count(*) from table where date=20140701` would run very fast (0.1 
 seconds).
 However, as I 

Re: Spark GraphaX

2015-08-23 Thread Robineast
GrapX is a graph analytics engine rather than a graph database. It's typical
use case is running large-scale graph algorithms like page rank , connected
components, label propagation and so on. It can be an element of complex
processing pipelines that involve other Spark components such as Data
Frames, machine learning and Spark Streaming.

If you need to store, update and query graph structures you might be better
served looking at Neo4j or Titan. If you still need the analytics capability
you can integrate Spark with the database.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-GraphaX-tp24408p24411.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-23 Thread Michael Armbrust
We should not be actually scanning all of the data of all of the
partitions, but we do need to at least list all of the available
directories so that we can apply your predicates to the actual values that
are present when we are deciding which files need to be read in a given
spark job.  While this is a somewhat expensive operation, we do it in
parallel and we cache this information when you access the same relation
more than once.

Can you provide a little more detail about how exactly you are accessing
the parquet data (are you using sqlContext.read or creating persistent
tables in the metastore?), and how long it is taking?  It would also be
good to know how many partitions we are talking about and how much data is
in each.  Finally, I'd like to see the stacktrace where it is hanging to
make sure my above assertions are correct.

We have several tables internally that have 1000s of partitions and while
it takes ~1 minute initially to discover the metadata, after that we are
able to query the data interactively.



On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.com
wrote:

 anybody has any suggestions?

 On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Is there a workaround without updating Hadoop? Would really appreciate if
 someone can explain what spark is trying to do here and what is an easy way
 to turn this off. Thanks all!

 On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 Did you try with hadoop version 2.7.1 .. It is known that s3a works
 really well with parquet which is available in 2.7. They fixed lot of
 issues related to metadata reading there...
 On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 @Cheng, Hao : Physical plans show that it got stuck on scanning S3!

 (table is partitioned by date_prefix and hour)
 explain select count(*) from test_table where date_prefix='20150819'
 and hour='00';

 TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]
  TungstenExchange SinglePartition
   TungstenAggregate(key=[],
 value=[(count(1),mode=Partial,isDistinct=false)]
Scan ParquetRelation[ .. about 1000 partition paths go here ]

 Why does spark have to scan all partitions when the query only concerns
 with 1 partitions? Doesn't it defeat the purpose of partitioning?

 Thanks!

 On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com
  wrote:

 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before,
 and I couldn't find much information about it online. What does it mean
 exactly to disable it? Are there any negative consequences to disabling 
 it?

 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com
 wrote:

 Can you make some more profiling? I am wondering if the driver is
 busy with scanning the HDFS / S3.

 Like jstack pid of driver process



 And also, it’s will be great if you can paste the physical plan for
 the simple query.



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 1:46 PM
 *To:* Cheng, Hao
 *Cc:* Philip Weaver; user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I cloned from TOT after 1.5.0 cut off. I noticed there were a couple
 of CLs trying to speed up spark sql with tables with a huge number of
 partitions, I've made sure that those CLs are included but it's still 
 very
 slow



 On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com
 wrote:

 Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled
 to false.



 BTW, which version are you using?



 Hao



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 12:16 PM
 *To:* Philip Weaver
 *Cc:* user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I guess the question is why does spark have to do partition discovery
 with all partitions when the query only needs to look at one partition? 
 Is
 there a conf flag to turn this off?



 On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver 
 philip.wea...@gmail.com wrote:

 I've had the same problem. It turns out that Spark (specifically
 parquet) is very slow at partition discovery. It got better in 1.5 (not 
 yet
 released), but was still unacceptably slow. Sadly, we ended up reading
 parquet files manually in Python (via C++) and had to abandon Spark SQL
 because of this problem.



 On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang 
 jerrickho...@gmail.com wrote:

 Hi all,



 I did a simple experiment with Spark SQL. I created a partitioned
 parquet table with only one partition (date=20140701). A simple `select
 count(*) from table where date=20140701` would run very fast (0.1 
 seconds).
 However, as I added more partitions the query takes longer and longer. 
 When
 I added about 10,000 partitions, the query took way too long. I feel like
 querying for a single partition should not be affected by having more
 

DataFrame rollup with alias?

2015-08-23 Thread Isabelle Phan
Hello,

I am new to Spark and just running some tests to get familiar with the APIs.

When calling the rollup function on my DataFrame, I get different results
when I alias the columns I am grouping on (see below for example data set).
I was expecting alias function to only affect the column name. Why is it
also affecting the rollup results?
(I know I can rename my columns after the rollup call, using
withColumnRenamed function, my question is just to get better understanding
of alias function.)

scala df.show
++--+-+
|Name|  Game|Score|
++--+-+
| Bob|Game 1|   20|
| Bob|Game 2|   30|
| Lea|Game 1|   25|
| Lea|Game 2|   30|
| Ben|Game 1|5|
| Ben|Game 3|   35|
| Bob|Game 3|   15|
++--+-+

//rollup results as expected
scala df.rollup(df(Name), df(Game)).sum().orderBy(Name, Game).show
++--+--+
|Name|  Game|SUM(Score)|
++--+--+
|null|  null|   160|
| Ben|  null|40|
| Ben|Game 1| 5|
| Ben|Game 3|35|
| Bob|  null|65|
| Bob|Game 1|20|
| Bob|Game 2|30|
| Bob|Game 3|15|
| Lea|  null|55|
| Lea|Game 1|25|
| Lea|Game 2|30|
++--+--+

//rollup with aliases return strange results
scala df.rollup(df(Name) as Player, df(Game) as
Round).sum().orderBy(Player, Round).show
+--+--+--+
|Player| Round|SUM(Score)|
+--+--+--+
|   Ben|Game 1| 5|
|   Ben|Game 1| 5|
|   Ben|Game 1| 5|
|   Ben|Game 3|35|
|   Ben|Game 3|35|
|   Ben|Game 3|35|
|   Bob|Game 1|20|
|   Bob|Game 1|20|
|   Bob|Game 1|20|
|   Bob|Game 2|30|
|   Bob|Game 2|30|
|   Bob|Game 2|30|
|   Bob|Game 3|15|
|   Bob|Game 3|15|
|   Bob|Game 3|15|
|   Lea|Game 1|25|
|   Lea|Game 1|25|
|   Lea|Game 1|25|
|   Lea|Game 2|30|
|   Lea|Game 2|30|
+--+--+--+


Thanks in advance for your help,

Isabelle