Re: migration from Teradata to Spark SQL

2016-05-03 Thread Deepak Sharma
Hi Tapan
I would suggest an architecture where you have different storage layer and
data servng layer.
Spark is still best for batch processing of data.
So what i am suggesting here is you can have your data stored as it is in
some hdfs raw layer , run your ELT in spark on this raw data and further
store the processed/transformed data in some nosql db such as Cassandra to
server the data to you that can handle large number of queries for you,

Thanks
Deepak

On Wed, May 4, 2016 at 6:59 AM, Tapan Upadhyay  wrote:

> Hi,
>
> We are planning to move our adhoc queries from teradata to spark. We have
> huge volume of queries during the day. What is best way to go about it -
>
> 1) Read data directly from teradata db using spark jdbc
>
> 2) Import data using sqoop by EOD jobs into hive tables stored as parquet
> and then run queries on hive tables using spark sql or spark hive context.
>
> any other ways through which we can do it in a better/efficiently?
>
> Please guide.
>
> Regards,
> Tapan
>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: spark 1.6.1 build failure of : scala-maven-plugin

2016-05-03 Thread Divya Gehlot
Hi ,
Even I am getting the similar error
Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile
When I tried to build Phoenix Project using maven .
Maven version : 3.3
Java version - 1.7_67
Phoenix - downloaded latest master from Git hub
If anybody find the the resolution please share.


Thanks,
Divya

On 3 May 2016 at 10:18, sunday2000 <2314476...@qq.com> wrote:

> [INFO]
> 
> [INFO] BUILD FAILURE
> [INFO]
> 
> [INFO] Total time: 14.765 s
> [INFO] Finished at: 2016-05-03T10:08:46+08:00
> [INFO] Final Memory: 35M/191M
> [INFO]
> 
> [ERROR] Failed to execute goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first)
> on project spark-test-tags_2.10: Execution scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed. CompileFailed
> -> [Help 1]
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
> goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile
> (scala-compile-first) on project spark-test-tags_2.10: Execution
> scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:224)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
> at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
> at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
> at
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
> at
> org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)
> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307)
> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193)
> at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106)
> at org.apache.maven.cli.MavenCli.execute(MavenCli.java:862)
> at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:286)
> at org.apache.maven.cli.MavenCli.main(MavenCli.java:197)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
> Caused by: org.apache.maven.plugin.PluginExecutionException: Execution
> scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
> at
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:145)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> ... 20 more
> Caused by: Compile failed via zinc server
> at
> sbt_inc.SbtIncrementalCompiler.zincCompile(SbtIncrementalCompiler.java:136)
> at
> sbt_inc.SbtIncrementalCompiler.compile(SbtIncrementalCompiler.java:86)
> at
> scala_maven.ScalaCompilerSupport.incrementalCompile(ScalaCompilerSupport.java:303)
> at
> scala_maven.ScalaCompilerSupport.compile(ScalaCompilerSupport.java:119)
> at
> scala_maven.ScalaCompilerSupport.doExecute(ScalaCompilerSupport.java:99)
> at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:482)
> at
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134)
> ... 21 more
> [ERROR]
> [ERROR]
> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
> [ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException
> [ERROR]
> [ERROR] After correcting the problems, you can resume the build with the
> command
> [ERROR]   mvn  -rf :spark-test-tags_2.10


回复: parquet table in spark-sql

2016-05-03 Thread 喜之郎
thanks for your answer, Sandeep .
And also thanks,Varadharajan.




-- 原始邮件 --
发件人: "Sandeep Nemuri";;
发送时间: 2016年5月3日(星期二) 晚上8:48
收件人: "Varadharajan Mukundan"; 
抄送: "喜之郎"<251922...@qq.com>; "user"; 
主题: Re: parquet table in spark-sql



We don't need any delimiters for Parquet file format.


ᐧ

On Tue, May 3, 2016 at 5:31 AM, Varadharajan Mukundan  
wrote:
Hi,

Yes, it is not needed. Delimiters are need only for text files.


On Tue, May 3, 2016 at 12:49 PM, 喜之郎 <251922...@qq.com> wrote:
hi, I want to ask a question about parquet table in spark-sql table.


I think that parquet have schema information in its own file.
so you don't need define row separator and column separator in create-table 
DDL, like that:


total_duration  BigInt)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
  LINES TERMINATED BY '\n' 



can anyone give me a answer? thanks











-- 
Thanks,
M. Varadharajan



"Experience is what you get when you didn't get what you wanted"
   -By Prof. Randy Pausch in "The Last Lecture"

My Journal :- http://varadharajan.in
 
 




-- 
  Regards  Sandeep Nemuri

Re: spark 1.6.1 build failure of : scala-maven-plugin

2016-05-03 Thread Ted Yu
Which version are you using now ?

I wonder if 1.8.0_91 had problem.

Cheers

On Tue, May 3, 2016 at 6:29 PM, sunday2000 <2314476...@qq.com> wrote:

> Problem solved, by use a newer version javac:
> [INFO]
> 
> [INFO] BUILD SUCCESS
> [INFO]
> 
> [INFO] Total time: 02:07 h
> [INFO] Finished at: 2016-05-03T19:51:15+08:00
> [INFO] Final Memory: 392M/1520M
> [INFO]
> 
>
>
> -- 原始邮件 --
> *发件人:* "sunday2000";<2314476...@qq.com>;
> *发送时间:* 2016年5月3日(星期二) 中午11:41
> *收件人:* "Ted Yu";
> *抄送:* "user";
> *主题:* 回复: spark 1.6.1 build failure of : scala-maven-plugin
>
> I use this command:
>
> build/mvn clean -Phive -Phive-thriftserver -Pyarn -Phadoop-2.6 -Psparkr
> -Dhadoop.version=2.7.0 package -DskipTests -X
>
> and get this failure message:
> [INFO]
> 
> [INFO] BUILD FAILURE
> [INFO]
> 
> [INFO] Total time: 14.567 s
> [INFO] Finished at: 2016-05-03T11:40:32+08:00
> [INFO] Final Memory: 42M/192M
> [INFO]
> 
> [ERROR] Failed to execute goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first)
> on project spark-test-tags_2.10: Execution scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed. CompileFailed
> -> [Help 1]
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
> goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile
> (scala-compile-first) on project spark-test-tags_2.10: Execution
> scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:224)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
> at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
> at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
> at
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
> at
> org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)
> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307)
> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193)
> at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106)
> at org.apache.maven.cli.MavenCli.execute(MavenCli.java:862)
> at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:286)
> at org.apache.maven.cli.MavenCli.main(MavenCli.java:197)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
> Caused by: org.apache.maven.plugin.PluginExecutionException: Execution
> scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
> at
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:145)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> ... 20 more
> Caused by: javac returned nonzero exit code
> at
> sbt.compiler.JavaCompiler$JavaTool0.compile(JavaCompiler.scala:77)
> at sbt.compiler.JavaTool$class.apply(JavaCompiler.scala:35)
> at sbt.compiler.JavaCompiler$JavaTool0.apply(JavaCompiler.scala:63)
> at sbt.compiler.JavaCompiler$class.compile(JavaCompiler.scala:21)
> at
> sbt.compiler.JavaCompiler$JavaTool0.compile(JavaCompiler.scala:63)
> at
> sbt.compiler.AggressiveCompile$$anonfun$3$$anonfun$compileJava$1$1.apply$mcV$sp(AggressiveCompile.scala:127)
> at
> sbt.compiler.AggressiveCompile$$anonfun$3$$anonfun$compileJava$1$1.apply(AggressiveCompile.scala:127)
> at
> 

?????? spark 1.6.1 build failure of : scala-maven-plugin

2016-05-03 Thread sunday2000
Problem solved, by use a newer version javac:
 [INFO] 
[INFO] BUILD SUCCESS
[INFO] 
[INFO] Total time: 02:07 h
[INFO] Finished at: 2016-05-03T19:51:15+08:00
[INFO] Final Memory: 392M/1520M
[INFO] 
  

 

 --  --
  ??: "sunday2000";<2314476...@qq.com>;
 : 2016??5??3??(??) 11:41
 ??: "Ted Yu"; 
 : "user"; 
 : ?? spark 1.6.1 build failure of : scala-maven-plugin

 

 
  I use this command:
 

 build/mvn clean -Phive -Phive-thriftserver -Pyarn -Phadoop-2.6 -Psparkr 
-Dhadoop.version=2.7.0 package -DskipTests -X
 

 and get this failure message:
 [INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 14.567 s
[INFO] Finished at: 2016-05-03T11:40:32+08:00
[INFO] Final Memory: 42M/192M
[INFO] 
[ERROR] Failed to execute goal 
net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first) on 
project spark-test-tags_2.10: Execution scala-compile-first of goal 
net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed. CompileFailed -> 
[Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal 
net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first) on 
project spark-test-tags_2.10: Execution scala-compile-first of goal 
net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:224)
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
at 
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
at 
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
at 
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
at 
org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)
at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307)
at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193)
at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106)
at org.apache.maven.cli.MavenCli.execute(MavenCli.java:862)
at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:286)
at org.apache.maven.cli.MavenCli.main(MavenCli.java:197)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
Caused by: org.apache.maven.plugin.PluginExecutionException: Execution 
scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile 
failed.
at 
org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:145)
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
... 20 more
Caused by: javac returned nonzero exit code
at sbt.compiler.JavaCompiler$JavaTool0.compile(JavaCompiler.scala:77)
at sbt.compiler.JavaTool$class.apply(JavaCompiler.scala:35)
at sbt.compiler.JavaCompiler$JavaTool0.apply(JavaCompiler.scala:63)
at sbt.compiler.JavaCompiler$class.compile(JavaCompiler.scala:21)
at sbt.compiler.JavaCompiler$JavaTool0.compile(JavaCompiler.scala:63)
at 
sbt.compiler.AggressiveCompile$$anonfun$3$$anonfun$compileJava$1$1.apply$mcV$sp(AggressiveCompile.scala:127)
at 
sbt.compiler.AggressiveCompile$$anonfun$3$$anonfun$compileJava$1$1.apply(AggressiveCompile.scala:127)
at 
sbt.compiler.AggressiveCompile$$anonfun$3$$anonfun$compileJava$1$1.apply(AggressiveCompile.scala:127)
at 
sbt.compiler.AggressiveCompile.sbt$compiler$AggressiveCompile$$timed(AggressiveCompile.scala:166)
at 

migration from Teradata to Spark SQL

2016-05-03 Thread Tapan Upadhyay
Hi,

We are planning to move our adhoc queries from teradata to spark. We have
huge volume of queries during the day. What is best way to go about it -

1) Read data directly from teradata db using spark jdbc

2) Import data using sqoop by EOD jobs into hive tables stored as parquet
and then run queries on hive tables using spark sql or spark hive context.

any other ways through which we can do it in a better/efficiently?

Please guide.

Regards,
Tapan


Re: Bit(N) on create Table with MSSQLServer

2016-05-03 Thread Mich Talebzadeh
Can you create the MSSQL (target) table first with the correct column
setting and insert data from Spark to it with JDBC as opposed to JDBC
creating target table itself?



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 3 May 2016 at 22:19, Andrés Ivaldi  wrote:

> Ok, Spark MSSQL dataType mapping is not right for me, ie. string is Text
> instead of varchar(MAX) , so how can I override default SQL Mapping?
>
> regards.
>
> On Sun, May 1, 2016 at 5:23 AM, Mich Talebzadeh  > wrote:
>
>> Well if MSSQL cannot create that column then it is more like
>> compatibility between Spark and RDBMS.
>>
>> What value that column has in MSSQL. Can you create table the table in
>> MSSQL database or map it in Spark to a valid column before opening JDBC
>> connection?
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 29 April 2016 at 16:16, Andrés Ivaldi  wrote:
>>
>>> Hello, Spark is executing a create table sentence (using JDBC) to
>>> MSSQLServer with a mapping column type like ColName Bit(1) for boolean
>>> types, This create table cannot be executed on MSSQLServer.
>>>
>>> In class JdbcDialect the mapping for Boolean type is Bit(1), so the
>>> question is, this is a problem of spark or JDBC driver who is not mapping
>>> right?
>>>
>>> Anyway it´s possible to override that mapping in Spark?
>>>
>>> Regards
>>>
>>> --
>>> Ing. Ivaldi Andres
>>>
>>
>>
>
>
> --
> Ing. Ivaldi Andres
>


RE: Multiple Spark Applications that use Cassandra, how to share resources/nodes

2016-05-03 Thread Mohammed Guller
You can run multiple Spark applications simultaneously. Just limit the # of 
cores and memory allocated to each application. For example, if each node has 8 
cores and there are 10 nodes and you want to be able to run 4 applications 
simultaneously, limit the # of cores for each application to 20. Similarly, you 
can limit the amount of memory that an application can use on each node.

You can also use dynamic resource allocation.
Details are here: 
http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

Mohammed
Author: Big Data Analytics with 
Spark

From: Tobias Eriksson [mailto:tobias.eriks...@qvantel.com]
Sent: Tuesday, May 3, 2016 7:34 AM
To: user@spark.apache.org
Subject: Multiple Spark Applications that use Cassandra, how to share 
resources/nodes

Hi
 We are using Spark for a long running job, in fact it is a REST-server that 
does some joins with some tables in Casandra and returns the result.
Now we need to have multiple applications running in the same Spark cluster, 
and from what I understand this is not possible, or should I say somewhat 
complicated

  1.  A Spark application takes all the resources / nodes in the cluster (we 
have 4 nodes one for each Cassandra Node)
  2.  A Spark application returns it’s resources when it is done (exits or the 
context is closed/returned)
  3.  Sharing resources using Mesos only allows scaling down and then scaling 
up by a step-by-step policy, i.e. 2 nodes, 3 nodes, 4 nodes, … And increases as 
the need increases
But if this is true, I can not have several applications running in parallell, 
is that true ?
If I use Mesos then the whole idea with one Spark Worker per Cassandra Node 
fails, as it talks directly to a node, and that is how it is so efficient.
In this case I need all nodes, not 3 out of 4.

Any mistakes in my thinking ?
Any ideas on how to solve this ? Should be a common problem I think

-Tobias




Re: Alternative to groupByKey() + mapValues() for non-commutative, non-associative aggregate?

2016-05-03 Thread Kevin Mellott
If you put this into a dataframe then you may be able to use one hot
encoding and treat these as categorical features. I believe that the ml
pipeline components use project tungsten so the performance will be very
fast. After you process the result on the dataframe you would then need to
assemble your desired format.
On May 3, 2016 4:29 PM, "Bibudh Lahiri"  wrote:

> Hi,
>   I have multiple procedure codes that a patient has undergone, in an RDD
> with a different row for each combination of patient and procedure. I am
> trying to covert this data to the LibSVM format, so that the resultant
> looks as follows:
>
>   "0 1:1 2:0 3:1 29:1 30:1 32:1 110:1"
>
>   where 1, 2, 3, 29, 30, 32, 110 are numeric procedure codes a given
> patient has undergone. Note that Spark needs these codes to be one-based
> and in ascending order, so I am using a combination of groupByKey() and
> mapValues() to do this conversion as follows:
>
> procedures_rdd = procedures_rdd.groupByKey().mapValues(combine_procedures)
>
> where combine_procedures() is defined as:
>
> def combine_procedures(l_procs):
>   ascii_procs = map(lambda x: int(custom_encode(x)), l_procs)
>   return ' '.join([str(x) + ":1" for x in sorted(ascii_procs)])
>
>   Note that this reduction is neither commutative nor associative, since
> combining "29:1 30:1" and "32:1 110:1" to "32:1 110:1 29:1 30:1" is not
> going to work.
>   Can someone suggest some faster alternative to the combination
> of groupByKey() and mapValues() for this?
>
> Thanks
>Bibudh
>
>
> --
> Bibudh Lahiri
> Senior Data Scientist, Impetus Technolgoies
> 720 University Avenue, Suite 130
> Los Gatos, CA 95129
> http://knowthynumbers.blogspot.com/
>
>


Alternative to groupByKey() + mapValues() for non-commutative, non-associative aggregate?

2016-05-03 Thread Bibudh Lahiri
Hi,
  I have multiple procedure codes that a patient has undergone, in an RDD
with a different row for each combination of patient and procedure. I am
trying to covert this data to the LibSVM format, so that the resultant
looks as follows:

  "0 1:1 2:0 3:1 29:1 30:1 32:1 110:1"

  where 1, 2, 3, 29, 30, 32, 110 are numeric procedure codes a given
patient has undergone. Note that Spark needs these codes to be one-based
and in ascending order, so I am using a combination of groupByKey() and
mapValues() to do this conversion as follows:

procedures_rdd = procedures_rdd.groupByKey().mapValues(combine_procedures)

where combine_procedures() is defined as:

def combine_procedures(l_procs):
  ascii_procs = map(lambda x: int(custom_encode(x)), l_procs)
  return ' '.join([str(x) + ":1" for x in sorted(ascii_procs)])

  Note that this reduction is neither commutative nor associative, since
combining "29:1 30:1" and "32:1 110:1" to "32:1 110:1 29:1 30:1" is not
going to work.
  Can someone suggest some faster alternative to the combination
of groupByKey() and mapValues() for this?

Thanks
   Bibudh


-- 
Bibudh Lahiri
Senior Data Scientist, Impetus Technolgoies
720 University Avenue, Suite 130
Los Gatos, CA 95129
http://knowthynumbers.blogspot.com/


Re: Improving performance of a kafka spark streaming app

2016-05-03 Thread Colin Kincaid Williams
Thanks Cody, I can see that the partitions are well distributed...
Then I'm in the process of using the direct api.

On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger  wrote:
> 60 partitions in and of itself shouldn't be a big performance issue
> (as long as producers are distributing across partitions evenly).
>
> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams  wrote:
>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>> issue with the receiver was the large number of partitions. I had
>> miscounted the disks and so 11*3*2 is how I decided to partition my
>> topic on insertion, ( by my own, unjustified reasoning, on a first
>> attempt ) . This worked well enough for me, I put 1.7 billion entries
>> into Kafka on a map reduce job in 5 and a half hours.
>>
>> I was concerned using spark 1.5.2 because I'm currently putting my
>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars
>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>> yesterday, I tried building against 1.5.2. So far it's running without
>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>> improvement using the same code, but I'll see how the direct api
>> handles it. In the end I can reduce the number of partitions in Kafka
>> if it causes big performance issues.
>>
>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger  wrote:
>>> print() isn't really the best way to benchmark things, since it calls
>>> take(10) under the covers, but 380 records / second for a single
>>> receiver doesn't sound right in any case.
>>>
>>> Am I understanding correctly that you're trying to process a large
>>> number of already-existing kafka messages, not keep up with an
>>> incoming stream?  Can you give any details (e.g. hardware, number of
>>> topicpartitions, etc)?
>>>
>>> Really though, I'd try to start with spark 1.6 and direct streams, or
>>> even just kafkacat, as a baseline.
>>>
>>>
>>>
>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams  
>>> wrote:
 Hello again. I searched for "backport kafka" in the list archives but
 couldn't find anything but a post from Spark 0.7.2 . I was going to
 use accumulators to make a counter, but then saw on the Streaming tab
 the Receiver Statistics. Then I removed all other "functionality"
 except:


 JavaPairReceiverInputDStream dstream = KafkaUtils
   //createStream(JavaStreamingContext jssc,Class
 keyTypeClass,Class valueTypeClass, Class keyDecoderClass,
 Class valueDecoderClass, java.util.Map kafkaParams,
 java.util.Map topics, StorageLevel storageLevel)
   .createStream(jssc, byte[].class, byte[].class,
 kafka.serializer.DefaultDecoder.class,
 kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
 StorageLevel.MEMORY_AND_DISK_SER());

dstream.print();

 Then in the Recieiver Stats for the single receiver, I'm seeing around
 380 records / second. Then to get anywhere near my 10% mentioned
 above, I'd need to run around 21 receivers, assuming 380 records /
 second, just using the print output. This seems awfully high to me,
 considering that I wrote 8+ records a second to Kafka from a
 mapreduce job, and that my bottleneck was likely Hbase. Again using
 the 380 estimate, I would need 200+ receivers to reach a similar
 amount of reads.

 Even given the issues with the 1.2 receivers, is this the expected way
 to use the Kafka streaming API, or am I doing something terribly
 wrong?

 My application looks like
 https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877

 On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger  wrote:
> Have you tested for read throughput (without writing to hbase, just
> deserialize)?
>
> Are you limited to using spark 1.2, or is upgrading possible?  The
> kafka direct stream is available starting with 1.3.  If you're stuck
> on 1.2, I believe there have been some attempts to backport it, search
> the mailing list archives.
>
> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams  
> wrote:
>> I've written an application to get content from a kafka topic with 1.7
>> billion entries,  get the protobuf serialized entries, and insert into
>> hbase. Currently the environment that I'm running in is Spark 1.2.
>>
>> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
>> 0-2500 writes / second. This will take much too long to consume the
>> entries.
>>
>> I currently believe that the spark kafka receiver is the bottleneck.
>> I've tried both 1.2 receivers, with the WAL and without, and didn't
>> notice any large performance 

Calculating log-loss for the trained model in Spark ML

2016-05-03 Thread Abhishek Anand
I am building a ML pipeline for logistic regression.

val lr = new LogisticRegression()

lr.setMaxIter(100).setRegParam(0.001)

val pipeline = new
Pipeline().setStages(Array(geoDimEncoder,clientTypeEncoder,
   devTypeDimIdEncoder,pubClientIdEncoder,tmpltIdEncoder,
   hourEncoder,assembler,lr))

val model = pipeline.fit(trainingDF)

Now, when the model is trained, I want to see the value
the probabilities for the training set and compute certain
validation parameters like log-loss. But, I am unable to find
this using "model".

The only thing I could find is

model.transform(testDF).select()

Cannot I get the metrics using the trained set for training set validation ?

Thanks !!


Re: Error while running jar using spark-submit on another machine

2016-05-03 Thread nsalian
Thank you for the question.
What is different on this machine as compared to the ones where the job
succeeded?





-
Neelesh S. Salian
Cloudera
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-jar-using-spark-submit-on-another-machine-tp26869p26875.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: a question about --executor-cores

2016-05-03 Thread nsalian
Hello,

Thank you for posting the question.
To begin I do have a few questions.
1) What is size of the YARN installation? How many NodeManagers? 

 
2) Notes to Remember:
Container Virtual CPU Cores
yarn.nodemanager.resource.cpu-vcores
>> Number of virtual CPU cores that can be allocated for containers.

Container Virtual CPU Cores Maximum
yarn.scheduler.maximum-allocation-vcores
>>  The largest number of virtual CPU cores that can be requested for a
>> container.


For executor-cores:
Every Spark executor in an application has the same fixed number of cores
and same fixed heap size. The number of cores can be specified with the
--executor-cores flag when invoking spark-submit, spark-shell, and pyspark
from the command line, or by setting the spark.executor.cores property in
the spark-defaults.conf file or on a SparkConf object. 

Similarly, the heap size can be controlled with the --executor-memory flag
or the spark.executor.memory property. The cores property controls the
number of concurrent tasks an executor can run. --executor-cores 5 means
that each executor can run a maximum of five tasks at the same time. The
memory property impacts the amount of data Spark can cache, as well as the
maximum sizes of the shuffle data structures used for grouping,
aggregations, and joins.


Imagine a cluster with six nodes running NodeManagers, each equipped with 16
cores and 64GB of memory. The NodeManager capacities,
yarn.nodemanager.resource.memory-mb and
yarn.nodemanager.resource.cpu-vcores, should probably be set to 63 * 1024 =
64512 (megabytes) and 15 respectively. We avoid allocating 100% of the
resources to YARN containers because the node needs some resources to run
the OS and Hadoop daemons. In this case, we leave a gigabyte and a core for
these system processes. Cloudera Manager helps by accounting for these and
configuring these YARN properties automatically.

The likely first impulse would be to use --num-executors 6 --executor-cores
15 --executor-memory 63G. However, this is the wrong approach because:

63GB + the executor memory overhead won’t fit within the 63GB capacity of
the NodeManagers.
The application master will take up a core on one of the nodes, meaning that
there won’t be room for a 15-core executor on that node.
15 cores per executor can lead to bad HDFS I/O throughput.
A better option would be to use --num-executors 17 --executor-cores 5
--executor-memory 19G. Why?

This config results in three executors on all nodes except for the one with
the AM, which will have two executors.
--executor-memory was derived as (63/3 executors per node) = 21.  21 * 0.07
= 1.47.  21 – 1.47 ~ 19.


This is covered here:
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/





-
Neelesh S. Salian
Cloudera
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/a-question-about-executor-cores-tp26868p26874.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Creating new Spark context when running in Secure YARN fails

2016-05-03 Thread nsalian
Feel free to correct me if I am wrong.
But I believe this isn't a feature yet:
 "create a new Spark context within a single JVM process (driver)"

A few questions for you:

1) Is Kerberos setup correctly for you (the user)
2) Could you please add the command/ code you are executing?
Checking to see if you provide a keytab and principal in your invocation.



-
Neelesh S. Salian
Cloudera
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Creating-new-Spark-context-when-running-in-Secure-YARN-fails-tp25361p26873.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Free memory while launching jobs.

2016-05-03 Thread mjordan79
I have a machine with an 8GB total memory, on which there are other
applications installed.

The Spark application must run 1 driver and two jobs at a time. I have
configured 8 cores in total.
The machine (without Spark) has 4GB of free RAM (the other half RAM is used
by other applications).

So I have configured 1 worker with a total memory of 2800MB of RAM. The
driver is configured to use 512MB limit (2 cores) and 762MB per executor.
The driver launch a driver process and a Spark Stream (always on) job,
occupying 512MB + 762MB (using 4 cores in total).
The other jobs will use 762MB each, so, when the whole app in started and
the 2 jobs (and the driver) are up, I should consume the whole 2.8GB of
memory.

Now, the free RAM. I said I have circa 4GB of RAM, so I should obtain 4 -
2.8 = 1.2GB of free RAM.
When jobs starts, however, I can see the free memory during execution is
near to 200MB.


Why this behaviour? Why Spark is using practically all the available RAM if
I use only 1 worker with a 2.8GB limit in total? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Free-memory-while-launching-jobs-tp26872.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Kevin Peng
Mike,

It looks like you are right.  The result seem to be fine.  It looks like I
messed up on the filtering clause.

sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE (s.date >= '2016-01-03' OR s.date IS NULL) AND (d.date >=
'2016-01-03' OR d.date IS NULL)").count()
res2: Long = 53042

Davies, Cesar, Gourav,

Thanks for the support.

KP

On Tue, May 3, 2016 at 11:26 AM, Michael Segel 
wrote:

> Silly question?
>
> If you change the predicate to
> ( s.date >= ‘2016-01-03’ OR s.date IS NULL )
> AND
> (d.date >= ‘2016-01-03’ OR d.date IS NULL)
>
> What do you get?
>
> Sorry if the syntax isn’t 100% correct. The idea is to not drop null
> values from the query.
> I would imagine that this shouldn’t kill performance since its most likely
> a post join filter on the result set?
> (Or is that just a Hive thing?)
>
> -Mike
>
> > On May 3, 2016, at 12:42 PM, Davies Liu  wrote:
> >
> > Bingo, the two predicate s.date >= '2016-01-03' AND d.date >=
> > '2016-01-03' is the root cause,
> > which will filter out all the nulls from outer join, will have same
> > result as inner join.
> >
> > In Spark 2.0, we turn these join into inner join actually.
> >
> > On Tue, May 3, 2016 at 9:50 AM, Cesar Flores  wrote:
> >> Hi
> >>
> >> Have you tried the joins without the where clause? When you use them
> you are
> >> filtering all the rows with null columns in those fields. In other
> words you
> >> are doing a inner join in all your queries.
> >>
> >> On Tue, May 3, 2016 at 11:37 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com>
> >> wrote:
> >>>
> >>> Hi Kevin,
> >>>
> >>> Having given it a first look I do think that you have hit something
> here
> >>> and this does not look quite fine. I have to work on the multiple AND
> >>> conditions in ON and see whether that is causing any issues.
> >>>
> >>> Regards,
> >>> Gourav Sengupta
> >>>
> >>> On Tue, May 3, 2016 at 8:28 AM, Kevin Peng  wrote:
> 
>  Davies,
> 
>  Here is the code that I am typing into the spark-shell along with the
>  results (my question is at the bottom):
> 
>  val dps =
>  sqlContext.read.format("com.databricks.spark.csv").option("header",
>  "true").load("file:///home/ltu/dps_csv/")
>  val swig =
>  sqlContext.read.format("com.databricks.spark.csv").option("header",
>  "true").load("file:///home/ltu/swig_csv/")
> 
>  dps.count
>  res0: Long = 42694
> 
>  swig.count
>  res1: Long = 42034
> 
> 
>  dps.registerTempTable("dps_pin_promo_lt")
>  swig.registerTempTable("swig_pin_promo_lt")
> 
>  sqlContext.sql("select * from dps_pin_promo_lt where date >
>  '2016-01-03'").count
>  res4: Long = 42666
> 
>  sqlContext.sql("select * from swig_pin_promo_lt where date >
>  '2016-01-03'").count
>  res5: Long = 34131
> 
>  sqlContext.sql("select distinct date, account, ad from
> dps_pin_promo_lt
>  where date > '2016-01-03'").count
>  res6: Long = 42533
> 
>  sqlContext.sql("select distinct date, account, ad from
> swig_pin_promo_lt
>  where date > '2016-01-03'").count
>  res7: Long = 34131
> 
> 
>  sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  ,
> d.account
>  AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>  d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
>  dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
> s.ad =
>  d.ad) WHERE s.date >= '2016-01-03' AND d.date >=
> '2016-01-03'").count()
>  res9: Long = 23809
> 
> 
>  sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  ,
> d.account
>  AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>  d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
>  dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
> s.ad =
>  d.ad) WHERE s.date >= '2016-01-03' AND d.date >=
> '2016-01-03'").count()
>  res10: Long = 23809
> 
> 
>  sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  ,
> d.account
>  AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>  d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
>  dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
> s.ad =
>  d.ad) WHERE s.date >= '2016-01-03' AND d.date >=
> '2016-01-03'").count()
>  res11: Long = 23809
> 
> 
>  sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  ,
> d.account
>  AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>  d.spend_in_dollar AS d_spend FROM 

Free memory while launching jobs.

2016-05-03 Thread Renato Perini
I have a machine with an 8GB total memory, on which there are other 
applications installed.


The Spark application must run 1 driver and two jobs at a time. I have 
configured 8 cores in total.
The machine (without Spark) has 4GB of free RAM (the other half RAM is 
used by other applications).


So I have configured 1 worker with a total memory of 2800MB of RAM. The 
driver is configured to use 512MB limit (2 cores) and 762MB per executor.
The driver launch a driver process and a Spark Stream (always on) job, 
occupying 512MB + 762MB (using 4 cores in total).
The other jobs will use 762MB each, so, when the whole app in started 
and the 2 jobs (and the driver) are up, I should consume the whole 2.8GB 
of memory.


Now, the free RAM. I said I have circa 4GB of RAM, so I should obtain 4 
- 2.8 = 1.2GB of free RAM.
When jobs starts, however, I can see the free memory during execution is 
near to 200MB.



Why this behaviour? Why Spark is using practically all the available RAM 
if I use only 1 worker with a 2.8GB limit in total?



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Improving performance of a kafka spark streaming app

2016-05-03 Thread Cody Koeninger
60 partitions in and of itself shouldn't be a big performance issue
(as long as producers are distributing across partitions evenly).

On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams  wrote:
> Thanks again Cody. Regarding the details 66 kafka partitions on 3
> kafka servers, likely 8 core systems with 10 disks each. Maybe the
> issue with the receiver was the large number of partitions. I had
> miscounted the disks and so 11*3*2 is how I decided to partition my
> topic on insertion, ( by my own, unjustified reasoning, on a first
> attempt ) . This worked well enough for me, I put 1.7 billion entries
> into Kafka on a map reduce job in 5 and a half hours.
>
> I was concerned using spark 1.5.2 because I'm currently putting my
> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars
> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
> yesterday, I tried building against 1.5.2. So far it's running without
> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
> improvement using the same code, but I'll see how the direct api
> handles it. In the end I can reduce the number of partitions in Kafka
> if it causes big performance issues.
>
> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger  wrote:
>> print() isn't really the best way to benchmark things, since it calls
>> take(10) under the covers, but 380 records / second for a single
>> receiver doesn't sound right in any case.
>>
>> Am I understanding correctly that you're trying to process a large
>> number of already-existing kafka messages, not keep up with an
>> incoming stream?  Can you give any details (e.g. hardware, number of
>> topicpartitions, etc)?
>>
>> Really though, I'd try to start with spark 1.6 and direct streams, or
>> even just kafkacat, as a baseline.
>>
>>
>>
>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams  
>> wrote:
>>> Hello again. I searched for "backport kafka" in the list archives but
>>> couldn't find anything but a post from Spark 0.7.2 . I was going to
>>> use accumulators to make a counter, but then saw on the Streaming tab
>>> the Receiver Statistics. Then I removed all other "functionality"
>>> except:
>>>
>>>
>>> JavaPairReceiverInputDStream dstream = KafkaUtils
>>>   //createStream(JavaStreamingContext jssc,Class
>>> keyTypeClass,Class valueTypeClass, Class keyDecoderClass,
>>> Class valueDecoderClass, java.util.Map kafkaParams,
>>> java.util.Map topics, StorageLevel storageLevel)
>>>   .createStream(jssc, byte[].class, byte[].class,
>>> kafka.serializer.DefaultDecoder.class,
>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>
>>>dstream.print();
>>>
>>> Then in the Recieiver Stats for the single receiver, I'm seeing around
>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>> above, I'd need to run around 21 receivers, assuming 380 records /
>>> second, just using the print output. This seems awfully high to me,
>>> considering that I wrote 8+ records a second to Kafka from a
>>> mapreduce job, and that my bottleneck was likely Hbase. Again using
>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>> amount of reads.
>>>
>>> Even given the issues with the 1.2 receivers, is this the expected way
>>> to use the Kafka streaming API, or am I doing something terribly
>>> wrong?
>>>
>>> My application looks like
>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>
>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger  wrote:
 Have you tested for read throughput (without writing to hbase, just
 deserialize)?

 Are you limited to using spark 1.2, or is upgrading possible?  The
 kafka direct stream is available starting with 1.3.  If you're stuck
 on 1.2, I believe there have been some attempts to backport it, search
 the mailing list archives.

 On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams  
 wrote:
> I've written an application to get content from a kafka topic with 1.7
> billion entries,  get the protobuf serialized entries, and insert into
> hbase. Currently the environment that I'm running in is Spark 1.2.
>
> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
> 0-2500 writes / second. This will take much too long to consume the
> entries.
>
> I currently believe that the spark kafka receiver is the bottleneck.
> I've tried both 1.2 receivers, with the WAL and without, and didn't
> notice any large performance difference. I've tried many different
> spark configuration options, but can't seem to get better performance.
>
> I saw 8 requests / second inserting these records into kafka using
> yarn / hbase / protobuf / kafka in a bulk fashion.
>
> While hbase inserts 

Re: Improving performance of a kafka spark streaming app

2016-05-03 Thread Colin Kincaid Williams
Thanks again Cody. Regarding the details 66 kafka partitions on 3
kafka servers, likely 8 core systems with 10 disks each. Maybe the
issue with the receiver was the large number of partitions. I had
miscounted the disks and so 11*3*2 is how I decided to partition my
topic on insertion, ( by my own, unjustified reasoning, on a first
attempt ) . This worked well enough for me, I put 1.7 billion entries
into Kafka on a map reduce job in 5 and a half hours.

I was concerned using spark 1.5.2 because I'm currently putting my
data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars
built for spark 1.2 on CDH 5.3. But after debugging quite a bit
yesterday, I tried building against 1.5.2. So far it's running without
issue on a Spark 1.5.2 cluster. I'm not sure there was too much
improvement using the same code, but I'll see how the direct api
handles it. In the end I can reduce the number of partitions in Kafka
if it causes big performance issues.

On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger  wrote:
> print() isn't really the best way to benchmark things, since it calls
> take(10) under the covers, but 380 records / second for a single
> receiver doesn't sound right in any case.
>
> Am I understanding correctly that you're trying to process a large
> number of already-existing kafka messages, not keep up with an
> incoming stream?  Can you give any details (e.g. hardware, number of
> topicpartitions, etc)?
>
> Really though, I'd try to start with spark 1.6 and direct streams, or
> even just kafkacat, as a baseline.
>
>
>
> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams  wrote:
>> Hello again. I searched for "backport kafka" in the list archives but
>> couldn't find anything but a post from Spark 0.7.2 . I was going to
>> use accumulators to make a counter, but then saw on the Streaming tab
>> the Receiver Statistics. Then I removed all other "functionality"
>> except:
>>
>>
>> JavaPairReceiverInputDStream dstream = KafkaUtils
>>   //createStream(JavaStreamingContext jssc,Class
>> keyTypeClass,Class valueTypeClass, Class keyDecoderClass,
>> Class valueDecoderClass, java.util.Map kafkaParams,
>> java.util.Map topics, StorageLevel storageLevel)
>>   .createStream(jssc, byte[].class, byte[].class,
>> kafka.serializer.DefaultDecoder.class,
>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>> StorageLevel.MEMORY_AND_DISK_SER());
>>
>>dstream.print();
>>
>> Then in the Recieiver Stats for the single receiver, I'm seeing around
>> 380 records / second. Then to get anywhere near my 10% mentioned
>> above, I'd need to run around 21 receivers, assuming 380 records /
>> second, just using the print output. This seems awfully high to me,
>> considering that I wrote 8+ records a second to Kafka from a
>> mapreduce job, and that my bottleneck was likely Hbase. Again using
>> the 380 estimate, I would need 200+ receivers to reach a similar
>> amount of reads.
>>
>> Even given the issues with the 1.2 receivers, is this the expected way
>> to use the Kafka streaming API, or am I doing something terribly
>> wrong?
>>
>> My application looks like
>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>
>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger  wrote:
>>> Have you tested for read throughput (without writing to hbase, just
>>> deserialize)?
>>>
>>> Are you limited to using spark 1.2, or is upgrading possible?  The
>>> kafka direct stream is available starting with 1.3.  If you're stuck
>>> on 1.2, I believe there have been some attempts to backport it, search
>>> the mailing list archives.
>>>
>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams  
>>> wrote:
 I've written an application to get content from a kafka topic with 1.7
 billion entries,  get the protobuf serialized entries, and insert into
 hbase. Currently the environment that I'm running in is Spark 1.2.

 With 8 executors and 2 cores, and 2 jobs, I'm only getting between
 0-2500 writes / second. This will take much too long to consume the
 entries.

 I currently believe that the spark kafka receiver is the bottleneck.
 I've tried both 1.2 receivers, with the WAL and without, and didn't
 notice any large performance difference. I've tried many different
 spark configuration options, but can't seem to get better performance.

 I saw 8 requests / second inserting these records into kafka using
 yarn / hbase / protobuf / kafka in a bulk fashion.

 While hbase inserts might not deliver the same throughput, I'd like to
 at least get 10%.

 My application looks like
 https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877

 This is my first spark application. I'd appreciate any assistance.

 

Re: removing header from csv file

2016-05-03 Thread Michael Segel
Hi, 
Another silly question… 

Don’t you want to use the header line to help create a schema for the RDD? 

Thx

-Mike

> On May 3, 2016, at 8:09 AM, Mathieu Longtin  wrote:
> 
> This only works if the files are "unsplittable". For example gzip files, each 
> partition is one file (if you have more partitions than files), so the first 
> line of each partition is the header.
> 
> Spark-csv extensions reads the very first line of the RDD, assumes it's the 
> header, and then filters every occurrence of that line. Something like this 
> (python code here, but Scala should be very similar)
> 
> header = data.first()
> data = data.filter(lambda line: line != header)
> 
> Since I had lots of small CSV files, and not all of them have the same exact 
> header, I use the following:
> 
> file_list = sc.parallelize(list_of_csv)
> data = 
> file_list.flatMap(function_that_reads_csvs_and_extracts_the_colums_I_want)
> 
> 
> 
> 
> On Tue, May 3, 2016 at 3:23 AM Abhishek Anand  > wrote:
> You can use this function to remove the header from your dataset(applicable 
> to RDD)
> 
> def dropHeader(data: RDD[String]): RDD[String] = {
> data.mapPartitionsWithIndex((idx, lines) => {
>   if (idx == 0) {
> lines.drop(1)
>   }
>   lines
> })
> }
> 
> 
> Abhi 
> 
> On Wed, Apr 27, 2016 at 12:55 PM, Marco Mistroni  > wrote:
> If u r using Scala api you can do
> Myrdd.zipwithindex.filter(_._2 >0).map(_._1)
> 
> Maybe a little bit complicated but will do the trick
> As per spark CSV, you will get back a data frame which you can reconduct to 
> rdd. .
> Hth
> Marco
> 
> On 27 Apr 2016 6:59 am, "nihed mbarek"  > wrote:
> You can add a filter with string that you are sure available only in the 
> header 
> 
> Le mercredi 27 avril 2016, Divya Gehlot  > a écrit :
> yes you can remove the headers by removing the first row 
> 
> can first() or head() to do that 
> 
> 
> Thanks,
> Divya 
> 
> On 27 April 2016 at 13:24, Ashutosh Kumar > wrote:
> I see there is a library spark-csv which can be used for removing header and 
> processing of csv files. But it seems it works with sqlcontext only. Is there 
> a way to remove header from csv files without sqlcontext ? 
> 
> Thanks
> Ashutosh
> 
> 
> 
> -- 
> 
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com 
> 
>  
> 
> 
> -- 
> Mathieu Longtin
> 1-514-803-8977



Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Michael Segel
Silly question? 

If you change the predicate to 
( s.date >= ‘2016-01-03’ OR s.date IS NULL ) 
AND 
(d.date >= ‘2016-01-03’ OR d.date IS NULL) 

What do you get? 

Sorry if the syntax isn’t 100% correct. The idea is to not drop null values 
from the query. 
I would imagine that this shouldn’t kill performance since its most likely a 
post join filter on the result set? 
(Or is that just a Hive thing?) 

-Mike

> On May 3, 2016, at 12:42 PM, Davies Liu  wrote:
> 
> Bingo, the two predicate s.date >= '2016-01-03' AND d.date >=
> '2016-01-03' is the root cause,
> which will filter out all the nulls from outer join, will have same
> result as inner join.
> 
> In Spark 2.0, we turn these join into inner join actually.
> 
> On Tue, May 3, 2016 at 9:50 AM, Cesar Flores  wrote:
>> Hi
>> 
>> Have you tried the joins without the where clause? When you use them you are
>> filtering all the rows with null columns in those fields. In other words you
>> are doing a inner join in all your queries.
>> 
>> On Tue, May 3, 2016 at 11:37 AM, Gourav Sengupta 
>> wrote:
>>> 
>>> Hi Kevin,
>>> 
>>> Having given it a first look I do think that you have hit something here
>>> and this does not look quite fine. I have to work on the multiple AND
>>> conditions in ON and see whether that is causing any issues.
>>> 
>>> Regards,
>>> Gourav Sengupta
>>> 
>>> On Tue, May 3, 2016 at 8:28 AM, Kevin Peng  wrote:
 
 Davies,
 
 Here is the code that I am typing into the spark-shell along with the
 results (my question is at the bottom):
 
 val dps =
 sqlContext.read.format("com.databricks.spark.csv").option("header",
 "true").load("file:///home/ltu/dps_csv/")
 val swig =
 sqlContext.read.format("com.databricks.spark.csv").option("header",
 "true").load("file:///home/ltu/swig_csv/")
 
 dps.count
 res0: Long = 42694
 
 swig.count
 res1: Long = 42034
 
 
 dps.registerTempTable("dps_pin_promo_lt")
 swig.registerTempTable("swig_pin_promo_lt")
 
 sqlContext.sql("select * from dps_pin_promo_lt where date >
 '2016-01-03'").count
 res4: Long = 42666
 
 sqlContext.sql("select * from swig_pin_promo_lt where date >
 '2016-01-03'").count
 res5: Long = 34131
 
 sqlContext.sql("select distinct date, account, ad from dps_pin_promo_lt
 where date > '2016-01-03'").count
 res6: Long = 42533
 
 sqlContext.sql("select distinct date, account, ad from swig_pin_promo_lt
 where date > '2016-01-03'").count
 res7: Long = 34131
 
 
 sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
 AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
 d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
 dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad 
 =
 d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
 res9: Long = 23809
 
 
 sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
 AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
 d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
 dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad 
 =
 d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
 res10: Long = 23809
 
 
 sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
 AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
 d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
 dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad 
 =
 d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
 res11: Long = 23809
 
 
 sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
 AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
 d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
 dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad 
 =
 d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
 res12: Long = 23809
 
 
 
 From my results above, we notice that the counts of distinct values based
 on the join criteria and filter criteria for each individual table is
 located at res6 and res7.  My question is why is the outer join producing
 less rows than the smallest table; if there are no matches it should still
 bring in that row as part of the outer join.  For the full and right outer
 join I am expecting to see a minimum of res6 rows, but I get less, is there
 something specific that I am missing here?  I am expecting that the full
 outer join would give me the union of the two table sets so I am expecting

Spark 1.5.2 Shuffle Blocks - running out of memory

2016-05-03 Thread Nirav Patel
Hi,

My spark application getting killed abruptly during a groupBy operation
where shuffle happens. All shuffle happens with PROCESS_LOCAL locality. I
see following in driver logs. Should not this logs be in executors? Anyhow
looks like ByteBuffer is running out of memory. What will be workaround for
this?


2016-05-02 22:38:53,595 INFO [sparkDriver-akka.actor.default-dispatcher-14]
org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 3 to hdn2.mycorp:45993
2016-05-02 22:38:53,832 INFO [sparkDriver-akka.actor.default-dispatcher-14]
org.apache.spark.storage.BlockManagerInfo: Removed broadcast_4_piece0 on
10.250.70.117:52328 in memory (size: 2.1 KB, free: 15.5 MB)
2016-05-02 22:39:03,704 WARN [New I/O worker #5]
org.jboss.netty.channel.DefaultChannelPipeline: An exception was thrown by
a user handler while handling an exception event ([id: 0xa8147f0c, /
10.250.70.110:48056 => /10.250.70.117:38300] EXCEPTION:
java.lang.OutOfMemoryError: Java heap space)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at
org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
at
org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
at
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
at
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
at
org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
at
org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
at
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
2016-05-02 22:39:05,783 ERROR
[sparkDriver-akka.actor.default-dispatcher-14]
org.apache.spark.rpc.akka.ErrorMonitor: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at
akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
at
akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at

Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Kevin Peng
Davies,

What exactly do you mean in regards to Spark 2.0 turning these join into an
inner join?  Does this mean that spark sql won't be supporting where
clauses in outer joins?


Cesar & Gourav,

When running the queries without the where clause it works as expected.  I
am pasting my results below:
val dps =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/dps_csv/")
val swig =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/swig_csv/")

dps.count
res0: Long = 42694

swig.count
res1: Long = 42034


dps.registerTempTable("dps_pin_promo_lt")
swig.registerTempTable("swig_pin_promo_lt")


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad)").count()
res5: Long = 60919


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad)").count()
res6: Long = 42034


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad)").count()
res7: Long = 42694

Thanks,

KP


On Tue, May 3, 2016 at 10:42 AM, Davies Liu  wrote:

> Bingo, the two predicate s.date >= '2016-01-03' AND d.date >=
> '2016-01-03' is the root cause,
>  which will filter out all the nulls from outer join, will have same
> result as inner join.
>
> In Spark 2.0, we turn these join into inner join actually.
>
> On Tue, May 3, 2016 at 9:50 AM, Cesar Flores  wrote:
> > Hi
> >
> > Have you tried the joins without the where clause? When you use them you
> are
> > filtering all the rows with null columns in those fields. In other words
> you
> > are doing a inner join in all your queries.
> >
> > On Tue, May 3, 2016 at 11:37 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com>
> > wrote:
> >>
> >> Hi Kevin,
> >>
> >> Having given it a first look I do think that you have hit something here
> >> and this does not look quite fine. I have to work on the multiple AND
> >> conditions in ON and see whether that is causing any issues.
> >>
> >> Regards,
> >> Gourav Sengupta
> >>
> >> On Tue, May 3, 2016 at 8:28 AM, Kevin Peng  wrote:
> >>>
> >>> Davies,
> >>>
> >>> Here is the code that I am typing into the spark-shell along with the
> >>> results (my question is at the bottom):
> >>>
> >>> val dps =
> >>> sqlContext.read.format("com.databricks.spark.csv").option("header",
> >>> "true").load("file:///home/ltu/dps_csv/")
> >>> val swig =
> >>> sqlContext.read.format("com.databricks.spark.csv").option("header",
> >>> "true").load("file:///home/ltu/swig_csv/")
> >>>
> >>> dps.count
> >>> res0: Long = 42694
> >>>
> >>> swig.count
> >>> res1: Long = 42034
> >>>
> >>>
> >>> dps.registerTempTable("dps_pin_promo_lt")
> >>> swig.registerTempTable("swig_pin_promo_lt")
> >>>
> >>> sqlContext.sql("select * from dps_pin_promo_lt where date >
> >>> '2016-01-03'").count
> >>> res4: Long = 42666
> >>>
> >>> sqlContext.sql("select * from swig_pin_promo_lt where date >
> >>> '2016-01-03'").count
> >>> res5: Long = 34131
> >>>
> >>> sqlContext.sql("select distinct date, account, ad from dps_pin_promo_lt
> >>> where date > '2016-01-03'").count
> >>> res6: Long = 42533
> >>>
> >>> sqlContext.sql("select distinct date, account, ad from
> swig_pin_promo_lt
> >>> where date > '2016-01-03'").count
> >>> res7: Long = 34131
> >>>
> >>>
> >>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  ,
> d.account
> >>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
> >>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
> >>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
> s.ad =
> >>> d.ad) WHERE s.date >= '2016-01-03' AND d.date >=
> '2016-01-03'").count()
> >>> res9: Long = 23809
> >>>
> >>>
> >>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  ,
> d.account
> >>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
> >>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
> >>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
> s.ad =
> >>> d.ad) WHERE s.date >= '2016-01-03' AND d.date >=
> '2016-01-03'").count()
> >>> res10: Long = 23809
> >>>
> >>>
> >>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  ,
> d.account
> >>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
> >>> 

Re: yarn-cluster

2016-05-03 Thread nsalian
Hello,

Thank you for the question.
The Status UNDEFINED means the application has not been completed and not
been resourced.
Upon getting assignment it will progress to RUNNING and then SUCCEEDED upon
completion.

It isn't a problem that you should worry about.
You should make sure to tune your YARN settings to help this work
appropriately and get the number of containers that the application needs.





-
Neelesh S. Salian
Cloudera
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/yarn-cluster-tp26846p26871.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Davies Liu
Bingo, the two predicate s.date >= '2016-01-03' AND d.date >=
'2016-01-03' is the root cause,
 which will filter out all the nulls from outer join, will have same
result as inner join.

In Spark 2.0, we turn these join into inner join actually.

On Tue, May 3, 2016 at 9:50 AM, Cesar Flores  wrote:
> Hi
>
> Have you tried the joins without the where clause? When you use them you are
> filtering all the rows with null columns in those fields. In other words you
> are doing a inner join in all your queries.
>
> On Tue, May 3, 2016 at 11:37 AM, Gourav Sengupta 
> wrote:
>>
>> Hi Kevin,
>>
>> Having given it a first look I do think that you have hit something here
>> and this does not look quite fine. I have to work on the multiple AND
>> conditions in ON and see whether that is causing any issues.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, May 3, 2016 at 8:28 AM, Kevin Peng  wrote:
>>>
>>> Davies,
>>>
>>> Here is the code that I am typing into the spark-shell along with the
>>> results (my question is at the bottom):
>>>
>>> val dps =
>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>>> "true").load("file:///home/ltu/dps_csv/")
>>> val swig =
>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>>> "true").load("file:///home/ltu/swig_csv/")
>>>
>>> dps.count
>>> res0: Long = 42694
>>>
>>> swig.count
>>> res1: Long = 42034
>>>
>>>
>>> dps.registerTempTable("dps_pin_promo_lt")
>>> swig.registerTempTable("swig_pin_promo_lt")
>>>
>>> sqlContext.sql("select * from dps_pin_promo_lt where date >
>>> '2016-01-03'").count
>>> res4: Long = 42666
>>>
>>> sqlContext.sql("select * from swig_pin_promo_lt where date >
>>> '2016-01-03'").count
>>> res5: Long = 34131
>>>
>>> sqlContext.sql("select distinct date, account, ad from dps_pin_promo_lt
>>> where date > '2016-01-03'").count
>>> res6: Long = 42533
>>>
>>> sqlContext.sql("select distinct date, account, ad from swig_pin_promo_lt
>>> where date > '2016-01-03'").count
>>> res7: Long = 34131
>>>
>>>
>>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
>>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad =
>>> d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
>>> res9: Long = 23809
>>>
>>>
>>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
>>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad =
>>> d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
>>> res10: Long = 23809
>>>
>>>
>>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
>>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad =
>>> d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
>>> res11: Long = 23809
>>>
>>>
>>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
>>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad =
>>> d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
>>> res12: Long = 23809
>>>
>>>
>>>
>>> From my results above, we notice that the counts of distinct values based
>>> on the join criteria and filter criteria for each individual table is
>>> located at res6 and res7.  My question is why is the outer join producing
>>> less rows than the smallest table; if there are no matches it should still
>>> bring in that row as part of the outer join.  For the full and right outer
>>> join I am expecting to see a minimum of res6 rows, but I get less, is there
>>> something specific that I am missing here?  I am expecting that the full
>>> outer join would give me the union of the two table sets so I am expecting
>>> at least 42533 rows not 23809.
>>>
>>>
>>> Gourav,
>>>
>>> I just ran this result set on a new session with slightly newer data...
>>> still seeing those results.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> KP
>>>
>>>
>>> On Mon, May 2, 2016 at 11:16 PM, Davies Liu 
>>> wrote:

 as @Gourav said, all the join with different join type show the same
 results,
 which meant that all the rows from left could match at least one row
 from right,
 all the rows from right could match at least one row from left, even
 the number of row from left does not equal that of right.

 This is correct result.

 

Re: how to orderBy previous groupBy.count.orderBy in pyspark

2016-05-03 Thread webe3vt
Here is what I ended up doing.  Improvements are welcome.

from pyspark.sql import SQLContext, Row
from pyspark.sql.types import StructType, StructField, IntegerType,
StringType
from pyspark.sql.functions import asc, desc, sum, count
sqlContext = SQLContext(sc)

error_schema = StructType([
StructField('id', IntegerType(), nullable=False),
StructField('error_code', IntegerType(),
nullable=False),
StructField('error_desc', StringType(), nullable=False)
])
error_data = sc.parallelize([
Row(1, 1, 'type 1 error'),
Row(1, 2, 'type 2 error'),
Row(2, 4, 'type 4 error'),
Row(2, 3, 'type 3 error'),
Row(2, 3, 'type 3 error'),
Row(2, 2, 'type 2 error'),
Row(2, 1, 'type 1 error'),
Row(3, 2, 'type 2 error'),
Row(3, 2, 'type 2 error'),
Row(3, 2, 'type 2 error'),
Row(3, 1, 'type 1 error'),
Row(3, 3, 'type 3 error'),
Row(3, 1, 'type 1 error'),
Row(3, 1, 'type 1 error'),
Row(3, 4, 'type 4 error'),
Row(3, 5, 'type 5 error'),
Row(3, 1, 'type 1 error'),
Row(3, 1, 'type 1 error'),
Row(3, 2, 'type 2 error'),
Row(3, 4, 'type 4 error'),
Row(3, 1, 'type 1 error'),

])
error_df = sqlContext.createDataFrame(error_data, error_schema)
error_df.show()
id_count_df =
error_df.groupBy(error_df["id"]).agg(count(error_df["id"]).alias("id_count")).orderBy(desc("id_count"))
joined_df = error_df.join(id_count_df, "id")
error_count_df = joined_df.groupBy(joined_df["id"], joined_df["error_code"],
joined_df["error_desc"]).agg(count(joined_df["id"]).alias("error_count"))
joined_df2 = joined_df.join(error_count_df, ["id", "error_code",
"error_desc"])
joined_df2.distinct().orderBy(desc("id_count"),
desc("error_count")).select("id", "error_code", "error_desc",
"error_count").show()


+---+--++
| id|error_code|  error_desc|
+---+--++
|  1| 1|type 1 error|
|  1| 2|type 2 error|
|  2| 4|type 4 error|
|  2| 3|type 3 error|
|  2| 3|type 3 error|
|  2| 2|type 2 error|
|  2| 1|type 1 error|
|  3| 2|type 2 error|
|  3| 2|type 2 error|
|  3| 2|type 2 error|
|  3| 1|type 1 error|
|  3| 3|type 3 error|
|  3| 1|type 1 error|
|  3| 1|type 1 error|
|  3| 4|type 4 error|
|  3| 5|type 5 error|
|  3| 1|type 1 error|
|  3| 1|type 1 error|
|  3| 2|type 2 error|
|  3| 4|type 4 error|
+---+--++
only showing top 20 rows

+---+--++---+
| id|error_code|  error_desc|error_count|
+---+--++---+
|  3| 1|type 1 error|  6|
|  3| 2|type 2 error|  4|
|  3| 4|type 4 error|  2|
|  3| 5|type 5 error|  1|
|  3| 3|type 3 error|  1|
|  2| 3|type 3 error|  2|
|  2| 2|type 2 error|  1|
|  2| 4|type 4 error|  1|
|  2| 1|type 1 error|  1|
|  1| 1|type 1 error|  1|
|  1| 2|type 2 error|  1|
+---+--++---+




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-orderBy-previous-groupBy-count-orderBy-in-pyspark-tp26864p26870.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Cesar Flores
Hi

Have you tried the joins without the where clause? When you use them you
are filtering all the rows with null columns in those fields. In other
words you are doing a inner join in all your queries.

On Tue, May 3, 2016 at 11:37 AM, Gourav Sengupta 
wrote:

> Hi Kevin,
>
> Having given it a first look I do think that you have hit something here
> and this does not look quite fine. I have to work on the multiple AND
> conditions in ON and see whether that is causing any issues.
>
> Regards,
> Gourav Sengupta
>
> On Tue, May 3, 2016 at 8:28 AM, Kevin Peng  wrote:
>
>> Davies,
>>
>> Here is the code that I am typing into the spark-shell along with the
>> results (my question is at the bottom):
>>
>> val dps =
>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>> "true").load("file:///home/ltu/dps_csv/")
>> val swig =
>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>> "true").load("file:///home/ltu/swig_csv/")
>>
>> dps.count
>> res0: Long = 42694
>>
>> swig.count
>> res1: Long = 42034
>>
>>
>> dps.registerTempTable("dps_pin_promo_lt")
>> swig.registerTempTable("swig_pin_promo_lt")
>>
>> sqlContext.sql("select * from dps_pin_promo_lt where date >
>> '2016-01-03'").count
>> res4: Long = 42666
>>
>> sqlContext.sql("select * from swig_pin_promo_lt where date >
>> '2016-01-03'").count
>> res5: Long = 34131
>>
>> sqlContext.sql("select distinct date, account, ad from dps_pin_promo_lt
>> where date > '2016-01-03'").count
>> res6: Long = 42533
>>
>> sqlContext.sql("select distinct date, account, ad from swig_pin_promo_lt
>> where date > '2016-01-03'").count
>> res7: Long = 34131
>>
>>
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad = d.ad) WHERE s.date >= '2016-01-03' AND d.date >=
>> '2016-01-03'").count()
>> res9: Long = 23809
>>
>>
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad = d.ad) WHERE s.date >= '2016-01-03' AND d.date >=
>> '2016-01-03'").count()
>> res10: Long = 23809
>>
>>
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad = d.ad) WHERE s.date >= '2016-01-03' AND d.date >=
>> '2016-01-03'").count()
>> res11: Long = 23809
>>
>>
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad = d.ad) WHERE s.date >= '2016-01-03' AND d.date >=
>> '2016-01-03'").count()
>> res12: Long = 23809
>>
>>
>>
>> From my results above, we notice that the counts of distinct values based
>> on the join criteria and filter criteria for each individual table is
>> located at res6 and res7.  My question is why is the outer join producing
>> less rows than the smallest table; if there are no matches it should still
>> bring in that row as part of the outer join.  For the full and right outer
>> join I am expecting to see a minimum of res6 rows, but I get less, is there
>> something specific that I am missing here?  I am expecting that the full
>> outer join would give me the union of the two table sets so I am expecting
>> at least 42533 rows not 23809.
>>
>>
>> Gourav,
>>
>> I just ran this result set on a new session with slightly newer data...
>> still seeing those results.
>>
>>
>>
>> Thanks,
>>
>> KP
>>
>>
>> On Mon, May 2, 2016 at 11:16 PM, Davies Liu 
>> wrote:
>>
>>> as @Gourav said, all the join with different join type show the same
>>> results,
>>> which meant that all the rows from left could match at least one row
>>> from right,
>>> all the rows from right could match at least one row from left, even
>>> the number of row from left does not equal that of right.
>>>
>>> This is correct result.
>>>
>>> On Mon, May 2, 2016 at 2:13 PM, Kevin Peng  wrote:
>>> > Yong,
>>> >
>>> > Sorry, let explain my deduction; it is going be difficult to get a
>>> sample
>>> > data out since the dataset I am using is proprietary.
>>> >
>>> > From the above set queries (ones mentioned in above comments) both
>>> inner and
>>> > outer join are producing the same counts.  They are basically pulling
>>> out
>>> > selected columns from the 

Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Gourav Sengupta
Hi Kevin,

Having given it a first look I do think that you have hit something here
and this does not look quite fine. I have to work on the multiple AND
conditions in ON and see whether that is causing any issues.

Regards,
Gourav Sengupta

On Tue, May 3, 2016 at 8:28 AM, Kevin Peng  wrote:

> Davies,
>
> Here is the code that I am typing into the spark-shell along with the
> results (my question is at the bottom):
>
> val dps =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").load("file:///home/ltu/dps_csv/")
> val swig =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").load("file:///home/ltu/swig_csv/")
>
> dps.count
> res0: Long = 42694
>
> swig.count
> res1: Long = 42034
>
>
> dps.registerTempTable("dps_pin_promo_lt")
> swig.registerTempTable("swig_pin_promo_lt")
>
> sqlContext.sql("select * from dps_pin_promo_lt where date >
> '2016-01-03'").count
> res4: Long = 42666
>
> sqlContext.sql("select * from swig_pin_promo_lt where date >
> '2016-01-03'").count
> res5: Long = 34131
>
> sqlContext.sql("select distinct date, account, ad from dps_pin_promo_lt
> where date > '2016-01-03'").count
> res6: Long = 42533
>
> sqlContext.sql("select distinct date, account, ad from swig_pin_promo_lt
> where date > '2016-01-03'").count
> res7: Long = 34131
>
>
> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
> = d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
> res9: Long = 23809
>
>
> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
> = d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
> res10: Long = 23809
>
>
> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
> = d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
> res11: Long = 23809
>
>
> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
> = d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
> res12: Long = 23809
>
>
>
> From my results above, we notice that the counts of distinct values based
> on the join criteria and filter criteria for each individual table is
> located at res6 and res7.  My question is why is the outer join producing
> less rows than the smallest table; if there are no matches it should still
> bring in that row as part of the outer join.  For the full and right outer
> join I am expecting to see a minimum of res6 rows, but I get less, is there
> something specific that I am missing here?  I am expecting that the full
> outer join would give me the union of the two table sets so I am expecting
> at least 42533 rows not 23809.
>
>
> Gourav,
>
> I just ran this result set on a new session with slightly newer data...
> still seeing those results.
>
>
>
> Thanks,
>
> KP
>
>
> On Mon, May 2, 2016 at 11:16 PM, Davies Liu  wrote:
>
>> as @Gourav said, all the join with different join type show the same
>> results,
>> which meant that all the rows from left could match at least one row from
>> right,
>> all the rows from right could match at least one row from left, even
>> the number of row from left does not equal that of right.
>>
>> This is correct result.
>>
>> On Mon, May 2, 2016 at 2:13 PM, Kevin Peng  wrote:
>> > Yong,
>> >
>> > Sorry, let explain my deduction; it is going be difficult to get a
>> sample
>> > data out since the dataset I am using is proprietary.
>> >
>> > From the above set queries (ones mentioned in above comments) both
>> inner and
>> > outer join are producing the same counts.  They are basically pulling
>> out
>> > selected columns from the query, but there is no roll up happening or
>> > anything that would possible make it suspicious that there is any
>> difference
>> > besides the type of joins.  The tables are matched based on three keys
>> that
>> > are present in both tables (ad, account, and date), on top of this they
>> are
>> > filtered by date being above 2016-01-03.  Since all the joins are
>> producing
>> > the same counts, the natural suspicions is that the 

Re: Error from reading S3 in Scala

2016-05-03 Thread Gourav Sengupta
Hi,

The best thing to do is start the EMR clusters with proper permissions in
the roles that way you do not need to worry about the keys at all.

Another thing, why are we using s3a// instead of s3:// ?

Besides that you can increase s3 speeds using the instructions mentioned
here:
https://aws.amazon.com/blogs/aws/aws-storage-update-amazon-s3-transfer-acceleration-larger-snowballs-in-more-regions/


Regards,
Gourav

On Tue, May 3, 2016 at 12:04 PM, Steve Loughran 
wrote:

> don't put your secret in the URI, it'll only creep out in the logs.
>
> Use the specific properties coverd in
> http://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html,
> which you can set in your spark context by prefixing them with spark.hadoop.
>
> you can also set the env vars, AWS_ACCESS_KEY_ID and
> AWS_SECRET_ACCESS_KEY; SparkEnv will pick these up and set the relevant
> spark context keys for you
>
>
> On 3 May 2016, at 01:53, Zhang, Jingyu  wrote:
>
> Hi All,
>
> I am using Eclipse with Maven for developing Spark applications. I got a
> error for Reading from S3 in Scala but it works fine in Java when I run
> them in the same project in Eclipse. The Scala/Java code and the error in
> following
>
>
> Scala
>
> val uri = URI.create("s3a://" + key + ":" + seckey + "@" +
> "graphclustering/config.properties");
> val pt = new Path("s3a://" + key + ":" + seckey + "@" +
> "graphclustering/config.properties");
> val fs = FileSystem.get(uri,ctx.hadoopConfiguration);
> val  inputStream:InputStream = fs.open(pt);
>
> Exception: on aws-java-1.7.4 and hadoop-aws-2.6.1
>
> Exception in thread "main"
> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
> 8A56DC7BF0BFF09A), S3 Extended Request ID
>
> at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(
> AmazonHttpClient.java:1160)
>
> at com.amazonaws.http.AmazonHttpClient.executeOneRequest(
> AmazonHttpClient.java:748)
>
> at com.amazonaws.http.AmazonHttpClient.executeHelper(
> AmazonHttpClient.java:467)
>
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:302)
>
> at com.amazonaws.services.s3.AmazonS3Client.invoke(
> AmazonS3Client.java:3785)
>
> at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(
> AmazonS3Client.java:1050)
>
> at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(
> AmazonS3Client.java:1027)
>
> at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(
> S3AFileSystem.java:688)
>
> at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:222)
>
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
>
> at com.news.report.graph.GraphCluster$.main(GraphCluster.scala:53)
>
> at com.news.report.graph.GraphCluster.main(GraphCluster.scala)
>
> 16/05/03 10:49:17 INFO SparkContext: Invoking stop() from shutdown hook
>
> 16/05/03 10:49:17 INFO SparkUI: Stopped Spark web UI at
> http://10.65.80.125:4040
>
> 16/05/03 10:49:17 INFO MapOutputTrackerMasterEndpoint:
> MapOutputTrackerMasterEndpoint stopped!
>
> 16/05/03 10:49:17 INFO MemoryStore: MemoryStore cleared
>
> 16/05/03 10:49:17 INFO BlockManager: BlockManager stopped
>
> Exception: on aws-java-1.7.4 and hadoop-aws-2.7.2
>
> 16/05/03 10:23:40 INFO Slf4jLogger: Slf4jLogger started
>
> 16/05/03 10:23:40 INFO Remoting: Starting remoting
>
> 16/05/03 10:23:40 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriverActorSystem@10.65.80.125:61860]
>
> 16/05/03 10:23:40 INFO Utils: Successfully started service
> 'sparkDriverActorSystem' on port 61860.
>
> 16/05/03 10:23:40 INFO SparkEnv: Registering MapOutputTracker
>
> 16/05/03 10:23:40 INFO SparkEnv: Registering BlockManagerMaster
>
> 16/05/03 10:23:40 INFO DiskBlockManager: Created local directory at
> /private/var/folders/sc/tdmkbvr1705b8p70xqj1kqks5l9p
>
> 16/05/03 10:23:40 INFO MemoryStore: MemoryStore started with capacity
> 1140.4 MB
>
> 16/05/03 10:23:40 INFO SparkEnv: Registering OutputCommitCoordinator
>
> 16/05/03 10:23:40 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
>
> 16/05/03 10:23:40 INFO SparkUI: Started SparkUI at
> http://10.65.80.125:4040
>
> 16/05/03 10:23:40 INFO Executor: Starting executor ID driver on host
> localhost
>
> 16/05/03 10:23:40 INFO Utils: Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 61861.
>
> 16/05/03 10:23:40 INFO NettyBlockTransferService: Server created on 61861
>
> 16/05/03 10:23:40 INFO BlockManagerMaster: Trying to register BlockManager
>
> 16/05/03 10:23:40 INFO BlockManagerMasterEndpoint: Registering block
> manager localhost:61861 with 1140.4 MB RAM, BlockManagerId(driver,
> localhost, 61861)
>
> 16/05/03 10:23:40 INFO BlockManagerMaster: Registered BlockManager
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> 

--jars for mesos cluster

2016-05-03 Thread Alex Dzhagriev
Hello all,

In the Mesos related spark docs (
http://spark.apache.org/docs/1.6.0/running-on-mesos.html#cluster-mode) I
found this statement:

Note that jars or python files that are passed to spark-submit should be
> URIs reachable by Mesos slaves, as the Spark driver doesn’t automatically
> upload local jars.



However, when I'm trying to submit the job using:

./spark-submit --class MyDriver --master mesos://my-mesos:7077
--deploy-mode cluster --supervise --executor-memory 2G --jars
http://3rd-party.jar http://my.jar

I don't see the --jars on the slave machine:

sh -c './bin/spark-submit --name MyDriver --master mesos://my-mesos:7077
--driver-cores 1.0 --driver-memory 1024M --class MyDriver --executor-memory
2G ./my.jar '

And of course I'm running into the ClassNotFoundException as the 3rd party
library is not there.

Can someone, please, help how to specify the --jars correctly?

Thanks, Alex.


Re: Multiple Spark Applications that use Cassandra, how to share resources/nodes

2016-05-03 Thread Andy Davidson
Hi Tobias

I am very interested implemented rest based api on top of spark. My rest
based system would make predictions from data provided in the request using
models trained in batch. My SLA is 250 ms.

Would you mind sharing how you implemented your rest server?

I am using spark-1.6.1. I have several unit tests that create spark context,
master is set to Œlocal[4]¹. I do not think the unit test frame is going to
scale. Can each rest server have a pool of sparks contexts?


The system would like to replacing is set up as follows

Layer of dumb load balancers: l1, l2, l3
Layer of proxy servers:   p1, p2, p3, p4, p5, Š.. Pn
Layer of containers:  c1, c2, c3, Š.. Cn

Where Cn is much larger than Pn


Kind regards

Andy

P.s. There is a talk on 5/5 about spark 2.0 Hoping there is something in the
near future.
https://www.brighttalk.com/webcast/12891/202021?utm_campaign=google-calendar
_content=_source=brighttalk-portal_medium=calendar_term=

From:  Tobias Eriksson 
Date:  Tuesday, May 3, 2016 at 7:34 AM
To:  "user @spark" 
Subject:  Multiple Spark Applications that use Cassandra, how to share
resources/nodes

> Hi 
>  We are using Spark for a long running job, in fact it is a REST-server that
> does some joins with some tables in Casandra and returns the result.
> Now we need to have multiple applications running in the same Spark cluster,
> and from what I understand this is not possible, or should I say somewhat
> complicated
> 1. A Spark application takes all the resources / nodes in the cluster (we have
> 4 nodes one for each Cassandra Node)
> 2. A Spark application returns it¹s resources when it is done (exits or the
> context is closed/returned)
> 3. Sharing resources using Mesos only allows scaling down and then scaling up
> by a step-by-step policy, i.e. 2 nodes, 3 nodes, 4 nodes, Š And increases as
> the need increases
> But if this is true, I can not have several applications running in parallell,
> is that true ?
> If I use Mesos then the whole idea with one Spark Worker per Cassandra Node
> fails, as it talks directly to a node, and that is how it is so efficient.
> In this case I need all nodes, not 3 out of 4.
> 
> Any mistakes in my thinking ?
> Any ideas on how to solve this ? Should be a common problem I think
> 
> -Tobias
> 
> 




Multiple Spark Applications that use Cassandra, how to share resources/nodes

2016-05-03 Thread Tobias Eriksson
Hi
 We are using Spark for a long running job, in fact it is a REST-server that 
does some joins with some tables in Casandra and returns the result.
Now we need to have multiple applications running in the same Spark cluster, 
and from what I understand this is not possible, or should I say somewhat 
complicated

  1.  A Spark application takes all the resources / nodes in the cluster (we 
have 4 nodes one for each Cassandra Node)
  2.  A Spark application returns it’s resources when it is done (exits or the 
context is closed/returned)
  3.  Sharing resources using Mesos only allows scaling down and then scaling 
up by a step-by-step policy, i.e. 2 nodes, 3 nodes, 4 nodes, … And increases as 
the need increases

But if this is true, I can not have several applications running in parallell, 
is that true ?
If I use Mesos then the whole idea with one Spark Worker per Cassandra Node 
fails, as it talks directly to a node, and that is how it is so efficient.
In this case I need all nodes, not 3 out of 4.

Any mistakes in my thinking ?
Any ideas on how to solve this ? Should be a common problem I think

-Tobias




Re: Spark streaming app starts processing when kill that app

2016-05-03 Thread Shams ul Haque
Hey Hareesh,

Thanks for the help, they were starving. I increased the core + memory on
that machine. Now it is working fine.

Thanks again

On Tue, May 3, 2016 at 12:57 PM, Shams ul Haque  wrote:

> No, i made a cluster of 2 machines. And after submission to master, this
> app moves on slave machine for execution.
> Well i am going to give a try to your suggestion by running both on same
> machine.
>
> Thanks
> Shams
>
> On Tue, May 3, 2016 at 12:53 PM, hareesh makam 
> wrote:
>
>> If you are running your master on a single core, it might be an issue of
>> Starvation.
>> assuming you are running it locally, try setting master to local[2] or
>> higher.
>>
>> Check the first example at
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html
>>
>> - Hareesh
>>
>> On 3 May 2016 at 12:35, Shams ul Haque  wrote:
>>
>>> Hi all,
>>>
>>> I am facing strange issue when running Spark Streaming app.
>>>
>>> What i was doing is, When i submit my app by *spark-submit *it works
>>> fine and also visible in Spark UI. But it doesn't process any data coming
>>> from kafka. And when i kill that app by pressing Ctrl + C on terminal, then
>>> it start processing all data received from Kafka and then get shutdown.
>>>
>>> I am trying to figure out why is this happening. Please help me if you
>>> know anything.
>>>
>>> Thanks and regards
>>> Shams ul Haque
>>>
>>
>>
>


unsubscribe

2016-05-03 Thread Rodrick Brown
unsubscribe  
  

  

\--

**Rodrick Brown** / Systems Engineer 

+1 917 445 6839 /
[rodr...@orchardplatform.com](mailto:char...@orchardplatform.com)

**Orchard Platform** 

101 5th Avenue, 4th Floor, New York, NY 10003

[http://www.orchardplatform.com](http://www.orchardplatform.com/)

[Orchard Blog](http://www.orchardplatform.com/blog/) | [Marketplace Lending
Meetup](http://www.meetup.com/Peer-to-Peer-Lending-P2P/)


-- 
*NOTICE TO RECIPIENTS*: This communication is confidential and intended for 
the use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an 
offer to sell or a solicitation of an indication of interest to purchase 
any loan, security or any other financial product or instrument, nor is it 
an offer to sell or a solicitation of an indication of interest to purchase 
any products or services to any persons who are prohibited from receiving 
such information under applicable law. The contents of this communication 
may not be accurate or complete and are subject to change without notice. 
As such, Orchard App, Inc. (including its subsidiaries and affiliates, 
"Orchard") makes no representation regarding the accuracy or completeness 
of the information contained herein. The intended recipient is advised to 
consult its own professional advisors, including those specializing in 
legal, tax and accounting matters. Orchard does not provide legal, tax or 
accounting advice.


Re: removing header from csv file

2016-05-03 Thread Mathieu Longtin
This only works if the files are "unsplittable". For example gzip files,
each partition is one file (if you have more partitions than files), so the
first line of each partition is the header.

Spark-csv extensions reads the very first line of the RDD, assumes it's the
header, and then filters every occurrence of that line. Something like this
(python code here, but Scala should be very similar)

header = data.first()
data = data.filter(lambda line: line != header)

Since I had lots of small CSV files, and not all of them have the same
exact header, I use the following:

file_list = sc.parallelize(list_of_csv)
data =
file_list.flatMap(function_that_reads_csvs_and_extracts_the_colums_I_want)




On Tue, May 3, 2016 at 3:23 AM Abhishek Anand 
wrote:

> You can use this function to remove the header from your
> dataset(applicable to RDD)
>
> def dropHeader(data: RDD[String]): RDD[String] = {
> data.mapPartitionsWithIndex((idx, lines) => {
>   if (idx == 0) {
> lines.drop(1)
>   }
>   lines
> })
> }
>
>
> Abhi
>
> On Wed, Apr 27, 2016 at 12:55 PM, Marco Mistroni 
> wrote:
>
>> If u r using Scala api you can do
>> Myrdd.zipwithindex.filter(_._2 >0).map(_._1)
>>
>> Maybe a little bit complicated but will do the trick
>> As per spark CSV, you will get back a data frame which you can reconduct
>> to rdd. .
>> Hth
>> Marco
>> On 27 Apr 2016 6:59 am, "nihed mbarek"  wrote:
>>
>>> You can add a filter with string that you are sure available only in the
>>> header
>>>
>>> Le mercredi 27 avril 2016, Divya Gehlot  a
>>> écrit :
>>>
 yes you can remove the headers by removing the first row

 can first() or head() to do that


 Thanks,
 Divya

 On 27 April 2016 at 13:24, Ashutosh Kumar 
 wrote:

> I see there is a library spark-csv which can be used for removing
> header and processing of csv files. But it seems it works with sqlcontext
> only. Is there a way to remove header from csv files without sqlcontext ?
>
> Thanks
> Ashutosh
>


>>>
>>> --
>>>
>>> M'BAREK Med Nihed,
>>> Fedora Ambassador, TUNISIA, Northern Africa
>>> http://www.nihed.com
>>>
>>> 
>>>
>>>
>>> --
Mathieu Longtin
1-514-803-8977


Re: parquet table in spark-sql

2016-05-03 Thread Sandeep Nemuri
We don't need any delimiters for Parquet file format.

ᐧ

On Tue, May 3, 2016 at 5:31 AM, Varadharajan Mukundan 
wrote:

> Hi,
>
> Yes, it is not needed. Delimiters are need only for text files.
>
> On Tue, May 3, 2016 at 12:49 PM, 喜之郎 <251922...@qq.com> wrote:
>
>> hi, I want to ask a question about parquet table in spark-sql table.
>>
>> I think that parquet have schema information in its own file.
>> so you don't need define row separator and column separator in
>> create-table DDL, like that:
>>
>> total_duration  BigInt)
>> ROW FORMAT DELIMITED
>>   FIELDS TERMINATED BY ','
>>   LINES TERMINATED BY '\n'
>>
>> can anyone give me a answer? thanks
>>
>>
>>
>
>
> --
> Thanks,
> M. Varadharajan
>
> 
>
> "Experience is what you get when you didn't get what you wanted"
>-By Prof. Randy Pausch in "The Last Lecture"
>
> My Journal :- http://varadharajan.in
>



-- 
*  Regards*
*  Sandeep Nemuri*


[Spark 1.5.2] Spark dataframes vs sql query -performance parameter ?

2016-05-03 Thread Divya Gehlot
Hi,
I am interested to know on which parameters  we can say Spark data frames
are better  sql queries .
Would be grateful ,If somebody can explain me with the usecases .

Thanks,
Divya


Re: parquet table in spark-sql

2016-05-03 Thread Varadharajan Mukundan
Hi,

Yes, it is not needed. Delimiters are need only for text files.

On Tue, May 3, 2016 at 12:49 PM, 喜之郎 <251922...@qq.com> wrote:

> hi, I want to ask a question about parquet table in spark-sql table.
>
> I think that parquet have schema information in its own file.
> so you don't need define row separator and column separator in
> create-table DDL, like that:
>
> total_duration  BigInt)
> ROW FORMAT DELIMITED
>   FIELDS TERMINATED BY ','
>   LINES TERMINATED BY '\n'
>
> can anyone give me a answer? thanks
>
>
>


-- 
Thanks,
M. Varadharajan



"Experience is what you get when you didn't get what you wanted"
   -By Prof. Randy Pausch in "The Last Lecture"

My Journal :- http://varadharajan.in


Re: Error from reading S3 in Scala

2016-05-03 Thread Steve Loughran
don't put your secret in the URI, it'll only creep out in the logs.

Use the specific properties coverd in 
http://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html, 
which you can set in your spark context by prefixing them with spark.hadoop.

you can also set the env vars, AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY; 
SparkEnv will pick these up and set the relevant spark context keys for you


On 3 May 2016, at 01:53, Zhang, Jingyu 
> wrote:

Hi All,

I am using Eclipse with Maven for developing Spark applications. I got a error 
for Reading from S3 in Scala but it works fine in Java when I run them in the 
same project in Eclipse. The Scala/Java code and the error in following


Scala

val uri = URI.create("s3a://" + key + ":" + seckey + "@" + 
"graphclustering/config.properties");
val pt = new Path("s3a://" + key + ":" + seckey + "@" + 
"graphclustering/config.properties");
val fs = FileSystem.get(uri,ctx.hadoopConfiguration);
val  inputStream:InputStream = fs.open(pt);


Exception: on aws-java-1.7.4 and hadoop-aws-2.6.1

Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: 
Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; 
Request ID: 8A56DC7BF0BFF09A), S3 Extended Request ID

at 
com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1160)

at 
com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:748)

at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:467)

at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:302)

at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)

at 
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)

at 
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)

at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:688)

at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:222)

at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)

at com.news.report.graph.GraphCluster$.main(GraphCluster.scala:53)

at com.news.report.graph.GraphCluster.main(GraphCluster.scala)

16/05/03 10:49:17 INFO SparkContext: Invoking stop() from shutdown hook

16/05/03 10:49:17 INFO SparkUI: Stopped Spark web UI at 
http://10.65.80.125:4040

16/05/03 10:49:17 INFO MapOutputTrackerMasterEndpoint: 
MapOutputTrackerMasterEndpoint stopped!

16/05/03 10:49:17 INFO MemoryStore: MemoryStore cleared

16/05/03 10:49:17 INFO BlockManager: BlockManager stopped

Exception: on aws-java-1.7.4 and hadoop-aws-2.7.2

16/05/03 10:23:40 INFO Slf4jLogger: Slf4jLogger started

16/05/03 10:23:40 INFO Remoting: Starting remoting

16/05/03 10:23:40 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkDriverActorSystem@10.65.80.125:61860]

16/05/03 10:23:40 INFO Utils: Successfully started service 
'sparkDriverActorSystem' on port 61860.

16/05/03 10:23:40 INFO SparkEnv: Registering MapOutputTracker

16/05/03 10:23:40 INFO SparkEnv: Registering BlockManagerMaster

16/05/03 10:23:40 INFO DiskBlockManager: Created local directory at 
/private/var/folders/sc/tdmkbvr1705b8p70xqj1kqks5l9p

16/05/03 10:23:40 INFO MemoryStore: MemoryStore started with capacity 1140.4 MB

16/05/03 10:23:40 INFO SparkEnv: Registering OutputCommitCoordinator

16/05/03 10:23:40 INFO Utils: Successfully started service 'SparkUI' on port 
4040.

16/05/03 10:23:40 INFO SparkUI: Started SparkUI at 
http://10.65.80.125:4040

16/05/03 10:23:40 INFO Executor: Starting executor ID driver on host localhost

16/05/03 10:23:40 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 61861.

16/05/03 10:23:40 INFO NettyBlockTransferService: Server created on 61861

16/05/03 10:23:40 INFO BlockManagerMaster: Trying to register BlockManager

16/05/03 10:23:40 INFO BlockManagerMasterEndpoint: Registering block manager 
localhost:61861 with 1140.4 MB RAM, BlockManagerId(driver, localhost, 61861)

16/05/03 10:23:40 INFO BlockManagerMaster: Registered BlockManager

Exception in thread "main" java.lang.NoSuchMethodError: 
com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V

at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:285)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)

at com.news.report.graph.GraphCluster$.main(GraphCluster.scala:52)

at 

Re: Reading from Amazon S3

2016-05-03 Thread Steve Loughran

On 2 May 2016, at 19:24, Gourav Sengupta 
> wrote:

Jorn,

what aspects are you speaking about ?

My response was absolutely pertinent to Jinan because he will not even face the 
problem if he used Scala. So it was along the lines of helping a person to 
learn fishing that giving him a fish.

Sorry: it will. This is a compatibility issue with Hadoop s3a as implemented in 
hadoop-aws.jar and amazon s3 libraries, something you can replicate in 
ScalaContext.textFile()

This is a classpath problem, not a language issue


And by the way your blinkered and biased response missed the fact that SPARK 
WAS WRITTEN AND IS WRITTEN IN SCALA.

And runs in the JVM, including much of the core ASF libraries, google guava, 
Hadoop core, Zookeeper, etc. Oh, and hadoop has some C native libraries which 
you should have for Unix performance, and will need for Windows.  There are 
bits of bash and python around, and even groovy in some of the spark-assembly 
JARs, though that's an accident which has been corrected in recent versions.

Languages are tools: the more you know, the more tools you have at your 
disposal.

Now, if you really want to make the problem go away, a patch for 
https://issues.apache.org/jira/browse/HADOOP-13062 would be nice; I promise I 
will review it


Re: Reading from Amazon S3

2016-05-03 Thread Steve Loughran

I'm going to start by letting you know two secret tools we use for diagnosing 
faults; one big data at work, the other a large RDBMS behind a web UI


1. Google
2. The search field in Apache JIRA



Given this is a senior project, these foundational tools are something you are 
going to need to know. It is a lot faster than asking on the mailing list, 
gives you real details, especially in JIRA, including fixes.

In this problem, I would recommend taking the first string of the stack and 
sticking it in google. See what turns up: it may actually be the answer you 
were looking for



On 2 May 2016, at 15:37, Jinan Alhajjaj 
> wrote:


Because I am doing this project for my senior project by using Java.
I try s3a URI as this:

s3a://accessId:secret@bucket/path

It show me an error :
Exception in thread "main" java.lang.NoSuchMethodError: 
com.amazonaws.services.s3.transfer.TransferManager.(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V



at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at 
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)
at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:526)

Date: Thu, 28 Apr 2016 11:19:08 +0100
Subject: Re: Reading from Amazon S3
From: gourav.sengu...@gmail.com
To: ste...@hortonworks.com
CC: yuzhih...@gmail.com; 
j.r.alhaj...@hotmail.com; 
user@spark.apache.org

Why would you use JAVA (create a problem and then try to solve it)? Have you 
tried using Scala or Python or even R?

Regards,
Gourav

On Thu, Apr 28, 2016 at 10:07 AM, Steve Loughran 
> wrote:

On 26 Apr 2016, at 18:49, Ted Yu 
> wrote:

Looking at the cause of the error, it seems hadoop-aws-xx.jar (corresponding to 
the version of hadoop you use) was missing in classpath.

yes, that s3n was moved from hadoop-common to the new hadoop-aws, and without 
realising it broke a lot of things.

you'll need hadoop-aws and jets3t on the classpath

If you are using Hadoop 2.7, I'd recommend s3a instead, which means hadoop-aws 
and the exact same 

parquet table in spark-sql

2016-05-03 Thread ??????
hi, I want to ask a question about parquet table in spark-sql table.


I think that parquet have schema information in its own file.
so you don't need define row separator and column separator in create-table 
DDL, like that:


total_duration  BigInt)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
  LINES TERMINATED BY '\n' 



can anyone give me a answer? thanks

Re: kafka direct streaming python API fromOffsets

2016-05-03 Thread Saisai Shao
I guess the problem is that py4j automatically translate the python int
into java int or long according to the value of the data. If this value is
small it will translate to java int, otherwise it will translate into java
long.

But in java code, the parameter must be long type, so that's the exception
you met.

AFAIK, if you're using python 2, you could specify long type like 123L or
long(123), so this data will be specifically translated into java long.
If you're using python 3, which has no long type, currently I'm sure if
there's a workaround about it.

You could refer to python kafka unit test to see the details of using
python api.

Thanks
Saisai



On Tue, May 3, 2016 at 4:11 PM, Tigran Avanesov <
tigran.avane...@olamobile.com> wrote:

> Thank you,
>
> But now I have this error:
>
> java.lang.ClassCastException: java.lang.Integer cannot be cast to
> java.lang.Long
>
> My offsets are actually not big enough to be long. If I put bigger values,
> I have no such exception.
> For me looks like a bug.
>
> Any ideas for a workaround?
>
> Thank!
>
>
> On 05/02/2016 06:57 PM, Cody Koeninger wrote:
>
>> If you're confused about the type of an argument, you're probably
>> better off looking at documentation that includes static types:
>>
>>
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$
>>
>> createDirectStream's fromOffsets parameter takes a map from
>> TopicAndPartition to Long.
>>
>> There is documentation for a python constructor for TopicAndPartition:
>>
>>
>> http://spark.apache.org/docs/latest/api/python/_modules/pyspark/streaming/kafka.html#TopicAndPartition
>>
>>
>> On Mon, May 2, 2016 at 5:54 AM, Tigran Avanesov
>>  wrote:
>>
>>> Hi,
>>>
>>> I'm trying to start consuming messages from a kafka topic (via direct
>>> stream) from a given offset.
>>> The documentation of createDirectStream says:
>>>
>>> :param fromOffsets: Per-topic/partition Kafka offsets defining the
>>> (inclusive) starting
>>> point of the stream.
>>>
>>> However it expects a dictionary of topics (not names...), as i tried to
>>> feed
>>> it something like { 'topic' : {0: 123, 1:234}}, and of course got an
>>> exception.
>>> How should I build this fromOffsets parameter?
>>>
>>> Documentation does not say anything about it.
>>> (In general, I think it would be better if the function accepted topic
>>> names)
>>>
>>> Thank you!
>>>
>>> Regards,
>>> Tigran
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
> --
>
> Tigran Avanesov | IT Architect
> phone: +352 261911 3562
> email: tigran.avane...@olamobile.com
> skype: tigran.avanesov.corporate
> post:  Olamobile S.à.r.l.
>2-4 rue Eugène Ruppert
>Bâtiment Vertigo-Polaris
>L-2453 Luxembourg
>Luxembourg
> web:   www.olamobile.com
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: kafka direct streaming python API fromOffsets

2016-05-03 Thread Tigran Avanesov

Thank you,

But now I have this error:

java.lang.ClassCastException: java.lang.Integer cannot be cast to 
java.lang.Long


My offsets are actually not big enough to be long. If I put bigger 
values, I have no such exception.

For me looks like a bug.

Any ideas for a workaround?

Thank!

On 05/02/2016 06:57 PM, Cody Koeninger wrote:

If you're confused about the type of an argument, you're probably
better off looking at documentation that includes static types:

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$

createDirectStream's fromOffsets parameter takes a map from
TopicAndPartition to Long.

There is documentation for a python constructor for TopicAndPartition:

http://spark.apache.org/docs/latest/api/python/_modules/pyspark/streaming/kafka.html#TopicAndPartition


On Mon, May 2, 2016 at 5:54 AM, Tigran Avanesov
 wrote:

Hi,

I'm trying to start consuming messages from a kafka topic (via direct
stream) from a given offset.
The documentation of createDirectStream says:

:param fromOffsets: Per-topic/partition Kafka offsets defining the
(inclusive) starting
point of the stream.

However it expects a dictionary of topics (not names...), as i tried to feed
it something like { 'topic' : {0: 123, 1:234}}, and of course got an
exception.
How should I build this fromOffsets parameter?

Documentation does not say anything about it.
(In general, I think it would be better if the function accepted topic
names)

Thank you!

Regards,
Tigran


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



--

Tigran Avanesov | IT Architect
phone: +352 261911 3562
email: tigran.avane...@olamobile.com
skype: tigran.avanesov.corporate
post:  Olamobile S.à.r.l.
   2-4 rue Eugène Ruppert
   Bâtiment Vertigo-Polaris
   L-2453 Luxembourg
   Luxembourg
web:   www.olamobile.com


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark build failure with com.oracle:ojdbc6:jar:11.2.0.1.0

2016-05-03 Thread Mich Talebzadeh
which version of Spark are using?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 3 May 2016 at 02:13, Luciano Resende  wrote:

> You might have a settings.xml that is forcing your internal Maven
> repository to be the mirror of external repositories and thus not finding
> the dependency.
>
> On Mon, May 2, 2016 at 6:11 PM, Hien Luu  wrote:
>
>> Not I am not.  I am considering downloading it manually and place it in
>> my local repository.
>>
>> On Mon, May 2, 2016 at 5:54 PM, ☼ R Nair (रविशंकर नायर) <
>> ravishankar.n...@gmail.com> wrote:
>>
>>> Oracle jdbc is not part of Maven repository,  are you keeping a
>>> downloaded file in your local repo?
>>>
>>> Best, RS
>>> On May 2, 2016 8:51 PM, "Hien Luu"  wrote:
>>>
 Hi all,

 I am running into a build problem with
 com.oracle:ojdbc6:jar:11.2.0.1.0.  It kept getting "Operation timed out"
 while building Spark Project Docker Integration Tests module (see the error
 below).

 Has anyone run this problem before? If so, how did you resolve around
 this problem?

 [INFO] Reactor Summary:

 [INFO]

 [INFO] Spark Project Parent POM ... SUCCESS [
 2.423 s]

 [INFO] Spark Project Test Tags  SUCCESS [
 0.712 s]

 [INFO] Spark Project Sketch ... SUCCESS [
 0.498 s]

 [INFO] Spark Project Networking ... SUCCESS [
 1.743 s]

 [INFO] Spark Project Shuffle Streaming Service  SUCCESS [
 0.587 s]

 [INFO] Spark Project Unsafe ... SUCCESS [
 0.503 s]

 [INFO] Spark Project Launcher . SUCCESS [
 4.894 s]

 [INFO] Spark Project Core . SUCCESS [
 17.953 s]

 [INFO] Spark Project GraphX ... SUCCESS [
 3.480 s]

 [INFO] Spark Project Streaming  SUCCESS [
 6.022 s]

 [INFO] Spark Project Catalyst . SUCCESS [
 8.664 s]

 [INFO] Spark Project SQL .. SUCCESS [
 12.440 s]

 [INFO] Spark Project ML Local Library . SUCCESS [
 0.498 s]

 [INFO] Spark Project ML Library ... SUCCESS [
 8.594 s]

 [INFO] Spark Project Tools  SUCCESS [
 0.162 s]

 [INFO] Spark Project Hive . SUCCESS [
 9.834 s]

 [INFO] Spark Project HiveContext Compatibility  SUCCESS [
 1.428 s]

 [INFO] Spark Project Docker Integration Tests . FAILURE
 [02:32 min]

 [INFO] Spark Project REPL . SKIPPED

 [INFO] Spark Project Assembly . SKIPPED

 [INFO] Spark Project External Flume Sink .. SKIPPED

 [INFO] Spark Project External Flume ... SKIPPED

 [INFO] Spark Project External Flume Assembly .. SKIPPED

 [INFO] Spark Project External Kafka ... SKIPPED

 [INFO] Spark Project Examples . SKIPPED

 [INFO] Spark Project External Kafka Assembly .. SKIPPED

 [INFO] Spark Project Java 8 Tests . SKIPPED

 [INFO]
 

 [INFO] BUILD FAILURE

 [INFO]
 

 [INFO] Total time: 03:53 min

 [INFO] Finished at: 2016-05-02T17:44:57-07:00

 [INFO] Final Memory: 80M/1525M

 [INFO]
 

 [ERROR] Failed to execute goal on project
 spark-docker-integration-tests_2.11: Could not resolve dependencies for
 project
 org.apache.spark:spark-docker-integration-tests_2.11:jar:2.0.0-SNAPSHOT:
 Failed to collect dependencies at com.oracle:ojdbc6:jar:11.2.0.1.0: Failed
 to read artifact descriptor for com.oracle:ojdbc6:jar:11.2.0.1.0: Could not
 transfer artifact com.oracle:ojdbc6:pom:11.2.0.1.0 from/to uber-artifactory
 (http://artifactory.uber.internal:4587/artifactory/repo/): Connect to
 artifactory.uber.internal:4587 [artifactory.uber.internal/10.162.11.61]
 failed: Operation timed out -> [Help 1]


>>
>>
>> --
>> Regards,
>>
>
>
>
> --
> Luciano Resende
> http://twitter.com/lresende1975
> 

Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Kevin Peng
Davies,

Here is the code that I am typing into the spark-shell along with the
results (my question is at the bottom):

val dps =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/dps_csv/")
val swig =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/swig_csv/")

dps.count
res0: Long = 42694

swig.count
res1: Long = 42034


dps.registerTempTable("dps_pin_promo_lt")
swig.registerTempTable("swig_pin_promo_lt")

sqlContext.sql("select * from dps_pin_promo_lt where date >
'2016-01-03'").count
res4: Long = 42666

sqlContext.sql("select * from swig_pin_promo_lt where date >
'2016-01-03'").count
res5: Long = 34131

sqlContext.sql("select distinct date, account, ad from dps_pin_promo_lt
where date > '2016-01-03'").count
res6: Long = 42533

sqlContext.sql("select distinct date, account, ad from swig_pin_promo_lt
where date > '2016-01-03'").count
res7: Long = 34131


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res9: Long = 23809


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res10: Long = 23809


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res11: Long = 23809


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res12: Long = 23809



>From my results above, we notice that the counts of distinct values based
on the join criteria and filter criteria for each individual table is
located at res6 and res7.  My question is why is the outer join producing
less rows than the smallest table; if there are no matches it should still
bring in that row as part of the outer join.  For the full and right outer
join I am expecting to see a minimum of res6 rows, but I get less, is there
something specific that I am missing here?  I am expecting that the full
outer join would give me the union of the two table sets so I am expecting
at least 42533 rows not 23809.


Gourav,

I just ran this result set on a new session with slightly newer data...
still seeing those results.



Thanks,

KP


On Mon, May 2, 2016 at 11:16 PM, Davies Liu  wrote:

> as @Gourav said, all the join with different join type show the same
> results,
> which meant that all the rows from left could match at least one row from
> right,
> all the rows from right could match at least one row from left, even
> the number of row from left does not equal that of right.
>
> This is correct result.
>
> On Mon, May 2, 2016 at 2:13 PM, Kevin Peng  wrote:
> > Yong,
> >
> > Sorry, let explain my deduction; it is going be difficult to get a sample
> > data out since the dataset I am using is proprietary.
> >
> > From the above set queries (ones mentioned in above comments) both inner
> and
> > outer join are producing the same counts.  They are basically pulling out
> > selected columns from the query, but there is no roll up happening or
> > anything that would possible make it suspicious that there is any
> difference
> > besides the type of joins.  The tables are matched based on three keys
> that
> > are present in both tables (ad, account, and date), on top of this they
> are
> > filtered by date being above 2016-01-03.  Since all the joins are
> producing
> > the same counts, the natural suspicions is that the tables are identical,
> > but I when I run the following two queries:
> >
> > scala> sqlContext.sql("select * from swig_pin_promo_lt where date
> >>='2016-01-03'").count
> >
> > res14: Long = 34158
> >
> > scala> sqlContext.sql("select * from dps_pin_promo_lt where date
> >>='2016-01-03'").count
> >
> > res15: Long = 42693
> >
> >
> > The above two queries filter out the data based on date used by the
> joins of
> > 2016-01-03 and you can see the row count between the two tables are
> > different, which is 

Re: Spark streaming app starts processing when kill that app

2016-05-03 Thread Shams ul Haque
No, i made a cluster of 2 machines. And after submission to master, this
app moves on slave machine for execution.
Well i am going to give a try to your suggestion by running both on same
machine.

Thanks
Shams

On Tue, May 3, 2016 at 12:53 PM, hareesh makam 
wrote:

> If you are running your master on a single core, it might be an issue of
> Starvation.
> assuming you are running it locally, try setting master to local[2] or
> higher.
>
> Check the first example at
> https://spark.apache.org/docs/latest/streaming-programming-guide.html
>
> - Hareesh
>
> On 3 May 2016 at 12:35, Shams ul Haque  wrote:
>
>> Hi all,
>>
>> I am facing strange issue when running Spark Streaming app.
>>
>> What i was doing is, When i submit my app by *spark-submit *it works
>> fine and also visible in Spark UI. But it doesn't process any data coming
>> from kafka. And when i kill that app by pressing Ctrl + C on terminal, then
>> it start processing all data received from Kafka and then get shutdown.
>>
>> I am trying to figure out why is this happening. Please help me if you
>> know anything.
>>
>> Thanks and regards
>> Shams ul Haque
>>
>
>


Re: removing header from csv file

2016-05-03 Thread Abhishek Anand
You can use this function to remove the header from your dataset(applicable
to RDD)

def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
  if (idx == 0) {
lines.drop(1)
  }
  lines
})
}


Abhi

On Wed, Apr 27, 2016 at 12:55 PM, Marco Mistroni 
wrote:

> If u r using Scala api you can do
> Myrdd.zipwithindex.filter(_._2 >0).map(_._1)
>
> Maybe a little bit complicated but will do the trick
> As per spark CSV, you will get back a data frame which you can reconduct
> to rdd. .
> Hth
> Marco
> On 27 Apr 2016 6:59 am, "nihed mbarek"  wrote:
>
>> You can add a filter with string that you are sure available only in the
>> header
>>
>> Le mercredi 27 avril 2016, Divya Gehlot  a
>> écrit :
>>
>>> yes you can remove the headers by removing the first row
>>>
>>> can first() or head() to do that
>>>
>>>
>>> Thanks,
>>> Divya
>>>
>>> On 27 April 2016 at 13:24, Ashutosh Kumar 
>>> wrote:
>>>
 I see there is a library spark-csv which can be used for removing
 header and processing of csv files. But it seems it works with sqlcontext
 only. Is there a way to remove header from csv files without sqlcontext ?

 Thanks
 Ashutosh

>>>
>>>
>>
>> --
>>
>> M'BAREK Med Nihed,
>> Fedora Ambassador, TUNISIA, Northern Africa
>> http://www.nihed.com
>>
>> 
>>
>>
>>


Re: Spark streaming app starts processing when kill that app

2016-05-03 Thread hareesh makam
If you are running your master on a single core, it might be an issue of
Starvation.
assuming you are running it locally, try setting master to local[2] or
higher.

Check the first example at
https://spark.apache.org/docs/latest/streaming-programming-guide.html

- Hareesh

On 3 May 2016 at 12:35, Shams ul Haque  wrote:

> Hi all,
>
> I am facing strange issue when running Spark Streaming app.
>
> What i was doing is, When i submit my app by *spark-submit *it works fine
> and also visible in Spark UI. But it doesn't process any data coming from
> kafka. And when i kill that app by pressing Ctrl + C on terminal, then it
> start processing all data received from Kafka and then get shutdown.
>
> I am trying to figure out why is this happening. Please help me if you
> know anything.
>
> Thanks and regards
> Shams ul Haque
>


Clear Threshold in Logistic Regression ML Pipeline

2016-05-03 Thread Abhishek Anand
Hi All,

I am trying to build a logistic regression pipeline in ML.

How can I clear the threshold which by default is 0.5. In mllib I am able
to clear the threshold to get the raw predictions using
model.clearThreshold() function.


Regards,
Abhi


Spark streaming app starts processing when kill that app

2016-05-03 Thread Shams ul Haque
Hi all,

I am facing strange issue when running Spark Streaming app.

What i was doing is, When i submit my app by *spark-submit *it works fine
and also visible in Spark UI. But it doesn't process any data coming from
kafka. And when i kill that app by pressing Ctrl + C on terminal, then it
start processing all data received from Kafka and then get shutdown.

I am trying to figure out why is this happening. Please help me if you know
anything.

Thanks and regards
Shams ul Haque


Submit job to spark cluster Error ErrorMonitor dropping message...

2016-05-03 Thread Tenghuan He
Hi

I deploy a Spark cluster with a master and a worker
the master and worker are both on a VMWare virtual machine, with 1G
memory and 2 cores.
master IP: 192.168.179.133
worker IP: 192.168.179.134
after execute sbin/start-all.sh, the master and the worker startup, visit
http://192.168.179.133:8080 and everything works greate.
However when I submit the job from the master node, I got nothing
except the three lines from the console,
[image: Inline image 1]

and the logs in the master

16/05/02 23:12:28 ERROR ErrorMonitor: dropping message [class
akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://
sparkMaster@192.168.179.133:7077/]] arriving at [akka.tcp://
sparkMaster@192.168.179.133:7077] inbound addresses are
[akka.tcp://sparkMaster@chadoop-master:7077]
akka.event.Logging$Error$NoCause$
16/05/02 23:12:48 INFO Master: 192.168.179.133:58793got disassociated,
removing it.
16/05/02 23:12:49 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkDriver@192.168.179.133:58793got ] has failed,
address is now gated for [5000] ms. Reason: [Disassociated]
16/05/02 23:12:49 INFO Master: 192.168.179.133:58793got got disassociated,
removing it.

Can anyone help me?

Thanks in advance


Tenghuan He


Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Davies Liu
as @Gourav said, all the join with different join type show the same results,
which meant that all the rows from left could match at least one row from right,
all the rows from right could match at least one row from left, even
the number of row from left does not equal that of right.

This is correct result.

On Mon, May 2, 2016 at 2:13 PM, Kevin Peng  wrote:
> Yong,
>
> Sorry, let explain my deduction; it is going be difficult to get a sample
> data out since the dataset I am using is proprietary.
>
> From the above set queries (ones mentioned in above comments) both inner and
> outer join are producing the same counts.  They are basically pulling out
> selected columns from the query, but there is no roll up happening or
> anything that would possible make it suspicious that there is any difference
> besides the type of joins.  The tables are matched based on three keys that
> are present in both tables (ad, account, and date), on top of this they are
> filtered by date being above 2016-01-03.  Since all the joins are producing
> the same counts, the natural suspicions is that the tables are identical,
> but I when I run the following two queries:
>
> scala> sqlContext.sql("select * from swig_pin_promo_lt where date
>>='2016-01-03'").count
>
> res14: Long = 34158
>
> scala> sqlContext.sql("select * from dps_pin_promo_lt where date
>>='2016-01-03'").count
>
> res15: Long = 42693
>
>
> The above two queries filter out the data based on date used by the joins of
> 2016-01-03 and you can see the row count between the two tables are
> different, which is why I am suspecting something is wrong with the outer
> joins in spark sql, because in this situation the right and outer joins may
> produce the same results, but it should not be equal to the left join and
> definitely not the inner join; unless I am missing something.
>
>
> Side note: In my haste response above I posted the wrong counts for
> dps.count, the real value is res16: Long = 42694
>
>
> Thanks,
>
>
> KP
>
>
>
>
> On Mon, May 2, 2016 at 12:50 PM, Yong Zhang  wrote:
>>
>> We are still not sure what is the problem, if you cannot show us with some
>> example data.
>>
>> For dps with 42632 rows, and swig with 42034 rows, if dps full outer join
>> with swig on 3 columns; with additional filters, get the same resultSet row
>> count as dps lefter outer join with swig on 3 columns, with additional
>> filters, again get the the same resultSet row count as dps right outer join
>> with swig on 3 columns, with same additional filters.
>>
>> Without knowing your data, I cannot see the reason that has to be a bug in
>> the spark.
>>
>> Am I misunderstanding your bug?
>>
>> Yong
>>
>> 
>> From: kpe...@gmail.com
>> Date: Mon, 2 May 2016 12:11:18 -0700
>> Subject: Re: Weird results with Spark SQL Outer joins
>> To: gourav.sengu...@gmail.com
>> CC: user@spark.apache.org
>>
>>
>> Gourav,
>>
>> I wish that was case, but I have done a select count on each of the two
>> tables individually and they return back different number of rows:
>>
>>
>> dps.registerTempTable("dps_pin_promo_lt")
>> swig.registerTempTable("swig_pin_promo_lt")
>>
>>
>> dps.count()
>> RESULT: 42632
>>
>>
>> swig.count()
>> RESULT: 42034
>>
>> On Mon, May 2, 2016 at 11:55 AM, Gourav Sengupta
>>  wrote:
>>
>> This shows that both the tables have matching records and no mismatches.
>> Therefore obviously you have the same results irrespective of whether you
>> use right or left join.
>>
>> I think that there is no problem here, unless I am missing something.
>>
>> Regards,
>> Gourav
>>
>> On Mon, May 2, 2016 at 7:48 PM, kpeng1  wrote:
>>
>> Also, the results of the inner query produced the same results:
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS
>> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
>> =
>> d.ad) WHERE s.date >= '2016-01-03'AND d.date >= '2016-01-03'").count()
>> RESULT:23747
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Weird-results-with-Spark-SQL-Outer-joins-tp26861p26863.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org