Re: Spark 2 and existing code with sqlContext

2016-08-12 Thread Koert Kuipers
you can get it from the SparkSession for backwards compatibility:
val sqlContext = spark.sqlContext

On Mon, Aug 8, 2016 at 9:11 AM, Mich Talebzadeh 
wrote:

> Hi,
>
> In Spark 1.6.1 this worked
>
> scala> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
> HH:mm:ss.ss') ").collect.foreach(println)
> [08/08/2016 14:07:22.22]
>
> Spark 2 should give due to backward compatibility?
>
> But I get
>
> cala> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
> HH:mm:ss.ss') ").collect.foreach(println)
> :24: error: not found: value sqlContext
>sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
> HH:mm:ss.ss') ").collect.foreach(println)
>
> Now we can change it to HiveContext and it works
>
> However, what is the best solution if any as we have loads of sqlContext
> in our code?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Spark 2 and existing code with sqlContext

2016-08-12 Thread Jacek Laskowski
Hi,

Also, in shell you have sql function available without the object.

Jacek

On 8 Aug 2016 6:11 a.m., "Mich Talebzadeh" 
wrote:

> Hi,
>
> In Spark 1.6.1 this worked
>
> scala> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
> HH:mm:ss.ss') ").collect.foreach(println)
> [08/08/2016 14:07:22.22]
>
> Spark 2 should give due to backward compatibility?
>
> But I get
>
> cala> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
> HH:mm:ss.ss') ").collect.foreach(println)
> :24: error: not found: value sqlContext
>sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
> HH:mm:ss.ss') ").collect.foreach(println)
>
> Now we can change it to HiveContext and it works
>
> However, what is the best solution if any as we have loads of sqlContext
> in our code?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Spark 2 and existing code with sqlContext

2016-08-12 Thread Jacek Laskowski
What about the following :

val sqlContext = spark

?

On 8 Aug 2016 6:11 a.m., "Mich Talebzadeh" 
wrote:

> Hi,
>
> In Spark 1.6.1 this worked
>
> scala> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
> HH:mm:ss.ss') ").collect.foreach(println)
> [08/08/2016 14:07:22.22]
>
> Spark 2 should give due to backward compatibility?
>
> But I get
>
> cala> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
> HH:mm:ss.ss') ").collect.foreach(println)
> :24: error: not found: value sqlContext
>sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
> HH:mm:ss.ss') ").collect.foreach(println)
>
> Now we can change it to HiveContext and it works
>
> However, what is the best solution if any as we have loads of sqlContext
> in our code?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: restart spark streaming app

2016-08-12 Thread Jacek Laskowski
Hi,

I think it's cluster deploy mode.

spark-submit --deploy-mode cluster --master yarn myStreamingApp.jar

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Fri, Aug 12, 2016 at 4:38 PM, Shifeng Xiao  wrote:
> Hi folks,
>
> I am using Spark streaming, and I am not clear if there is smart way to
> restart the app once it fails, currently we just have one cron job to check
> if the job is running every  2 or 5 minutes and restart the app when
> necessary.
>
> According to spark streaming guide:
>
> YARN - Yarn supports a similar mechanism for automatically restarting an
> application. Please refer to YARN documentation for more details.
>
>
> Not sure how to achieve that with YARN, anyone can help me on it?
>
> Thanks

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Accessing HBase through Spark with Security enabled

2016-08-12 Thread Jacek Laskowski
Hi,

How do you access HBase? What's the version of Spark?

(I don't see spark packages in the stack trace)

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Aug 7, 2016 at 9:02 AM, Aneela Saleem  wrote:
> Hi all,
>
> I'm trying to run a spark job that accesses HBase with security enabled.
> When i run the following command:
>
> /usr/local/spark-2/bin/spark-submit --keytab /etc/hadoop/conf/spark.keytab
> --principal spark/hadoop-master@platalyticsrealm --class
> com.platalytics.example.spark.App --master yarn  --driver-class-path
> /root/hbase-1.2.2/conf /home/vm6/project-1-jar-with-dependencies.jar
>
>
> I get the following error:
>
>
> 2016-08-07 20:43:57,617 WARN
> [hconnection-0x24b5fa45-metaLookup-shared--pool2-t1] ipc.RpcClientImpl:
> Exception encountered while connecting to the server :
> javax.security.sasl.SaslException: GSS initiate failed [Caused by
> GSSException: No valid credentials provided (Mechanism level: Failed to find
> any Kerberos tgt)]
> 2016-08-07 20:43:57,619 ERROR
> [hconnection-0x24b5fa45-metaLookup-shared--pool2-t1] ipc.RpcClientImpl: SASL
> authentication failed. The most likely cause is missing or invalid
> credentials. Consider 'kinit'.
> javax.security.sasl.SaslException: GSS initiate failed [Caused by
> GSSException: No valid credentials provided (Mechanism level: Failed to find
> any Kerberos tgt)]
>   at
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
>   at
> org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
>   at
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:617)
>   at
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$700(RpcClientImpl.java:162)
>   at
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:743)
>   at
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:740)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:415)
>   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>   at
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:740)
>   at
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:906)
>   at
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:873)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1241)
>   at
> org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:227)
>   at
> org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:336)
>   at
> org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:34094)
>   at
> org.apache.hadoop.hbase.client.ClientSmallScanner$SmallScannerCallable.call(ClientSmallScanner.java:201)
>   at
> org.apache.hadoop.hbase.client.ClientSmallScanner$SmallScannerCallable.call(ClientSmallScanner.java:180)
>   at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:210)
>   at
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:360)
>   at
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:334)
>   at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:136)
>   at
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:65)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: GSSException: No valid credentials provided (Mechanism level:
> Failed to find any Kerberos tgt)
>   at
> sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
>   at
> sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121)
>   at
> sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
>   at
> sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223)
>   at 
> sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
>   at 
> sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
>   at
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)
>   ... 25 more
>
>

Re: Flattening XML in a DataFrame

2016-08-12 Thread Hyukjin Kwon
Hi Sreekanth,

Assuming you are using Spark 1.x,

I believe this code below:

sqlContext.read.format("com.databricks.spark.xml").option("rowTag",
"emp").load("/tmp/sample.xml")
  .selectExpr("manager.id", "manager.name",
"explode(manager.subordinates.clerk) as clerk")
  .selectExpr("id", "name", "clerk.cid", "clerk.cname")
  .show()

would print the results below as you want:

+---++---+-+
| id|name|cid|cname|
+---++---+-+
|  1| foo|  1|  foo|
|  1| foo|  1|  foo|
+---++---+-+

​

I hope this is helpful.

Thanks!




2016-08-13 9:33 GMT+09:00 Sreekanth Jella :

> Hi Folks,
>
>
>
> I am trying flatten variety of XMLs using DataFrames. I’m using spark-xml
> package which is automatically inferring my schema and creating a
> DataFrame.
>
>
>
> I do not want to hard code any column names in DataFrame as I have lot of
> varieties of XML documents and each might be lot more depth of child nodes.
> I simply want to flatten any type of XML and then write output data to a
> hive table. Can you please give some expert advice for the same.
>
>
>
> Example XML and expected output is given below.
>
>
>
> Sample XML:
>
> 
>
> 
>
>
>
>1
>
>foo
>
> 
>
>   
>
> 1
>
> foo
>
>   
>
>   
>
> 1
>
> foo
>
>   
>
> 
>
>
>
> 
>
> 
>
>
>
> Expected output:
>
> id, name, clerk.cid, clerk.cname
>
> 1, foo, 2, cname2
>
> 1, foo, 3, cname3
>
>
>
> Thanks,
>
> Sreekanth Jella
>
>
>


Re: Spark 2 cannot create ORC table when CLUSTERED. This worked in Spark 1.6.1

2016-08-12 Thread Jacek Laskowski
Hi Mich,

File a JIRA issue as that seems as if they overlooked that part. Spark
2.0 has less and less HiveQL with more and more native support.

(My take on this is that the days of Hive in Spark are counted and
Hive is gonna disappear soon)

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Aug 11, 2016 at 10:02 AM, Mich Talebzadeh
 wrote:
>
>
> This does not work with CLUSTERED BY clause in Spark 2 now!
>
> CREATE TABLE test.dummy2
>  (
>  ID INT
>, CLUSTERED INT
>, SCATTERED INT
>, RANDOMISED INT
>, RANDOM_STRING VARCHAR(50)
>, SMALL_VC VARCHAR(10)
>, PADDING  VARCHAR(10)
> )
> CLUSTERED BY (ID) INTO 256 BUCKETS
> STORED AS ORC
> TBLPROPERTIES ( "orc.compress"="SNAPPY",
> "orc.create.index"="true",
> "orc.bloom.filter.columns"="ID",
> "orc.bloom.filter.fpp"="0.05",
> "orc.stripe.size"="268435456",
> "orc.row.index.stride"="1" )
> scala> HiveContext.sql(sqltext)
> org.apache.spark.sql.catalyst.parser.ParseException:
> Operation not allowed: CREATE TABLE ... CLUSTERED BY(line 2, pos 0)
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed. The
> author will in no case be liable for any monetary damages arising from such
> loss, damage or destruction.
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Single point of failure with Driver host crashing

2016-08-12 Thread Jacek Laskowski
Hi,

I'm sure that cluster deploy mode would solve it very well. It'd be a
cluster issue then to re-execute the driver then?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Aug 11, 2016 at 12:40 PM, Mich Talebzadeh
 wrote:
>
> Hi,
>
> Although Spark is fault tolerant when nodes go down like below:
>
> FROM tmp
> [Stage 1:===>   (20 + 10) /
> 100]16/08/11 20:21:34 ERROR TaskSchedulerImpl: Lost executor 3 on
> xx.xxx.197.216: worker lost
> [Stage 1:>   (44 + 8) /
> 100]
> It can carry on.
>
> However, when the node (the host) that the app was started  on goes down the
> job fails as the driver disappears  as well. Is there a way to avoid this
> single point of failure, assuming what I am stating is valid?
>
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed. The
> author will in no case be liable for any monetary damages arising from such
> loss, damage or destruction.
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Unable to run spark examples in eclipse

2016-08-12 Thread Jacek Laskowski
Hi,

You need to add spark-* libs to the project (using the
Eclipse-specific way). The best bet would be to use maven as the main
project management tool and import the project then.

Seen 
https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-Eclipse
?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Fri, Aug 12, 2016 at 10:49 AM, subash basnet  wrote:
> Hello all,
>
> I am completely new to spark. I downloaded the spark project from github
> (https://github.com/apache/spark) and wanted to run the examples. I
> successfully ran the maven command:
>
> build/mvn -DskipTests clean package
>
> But I am not able to build the spark-examples_2.11 project. There are import
> issues in the java files like below:
> Eg: The import org.apache.spark.SparkConf cannot be resolved
> The import org.apache.spark.api.java.JavaSparkContext cannot be resolved
> and so on displayed in eclipse.
>
> What am I doing wrong to run these examples?
>
>
> Best Regards,
> Subash Basnet

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why I can't use broadcast var defined in a global object?

2016-08-12 Thread yaochunnan
Hi David, 

Thank you for detailed reply. I understand what you said about the ideas on
broadcast variable. But I am still a little bit confused. In your reply, you
said:

*It has sent largeValue across the network to each worker already, and gave
you a/ key /to retrieve it.*

So my question is, now I've put the* key*, which is  var br_value in a
global singleton (globalObject), why I can't invoke my largeValue later in a
closure by calling the key, which is globalObject.br_value.value  ?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-I-can-t-use-broadcast-var-defined-in-a-global-object-tp27523p27525.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using spark package XGBoost

2016-08-12 Thread janardhan shetty
I tried using  *sparkxgboost package *in build.sbt file but it failed.
Spark 2.0
Scala 2.11.8

Error:
 [warn]
http://dl.bintray.com/spark-packages/maven/rotationsymmetry/sparkxgboost/0.2.1-s_2.10/sparkxgboost-0.2.1-s_2.10-javadoc.jar
   [warn] ::
   [warn] ::  FAILED DOWNLOADS::
   [warn] :: ^ see resolution messages for details  ^ ::
   [warn] ::
   [warn] ::
rotationsymmetry#sparkxgboost;0.2.1-s_2.10!sparkxgboost.jar(src)
   [warn] ::
rotationsymmetry#sparkxgboost;0.2.1-s_2.10!sparkxgboost.jar(doc)

build.sbt:

scalaVersion := "2.11.8"

libraryDependencies ++= {
  val sparkVersion = "2.0.0-preview"
  Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided"
  )
}



*resolvers += "Spark Packages Repo" at
"http://dl.bintray.com/spark-packages/maven
"libraryDependencies +=
"rotationsymmetry" % "sparkxgboost" % "0.2.1-s_2.10"*

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", "MANIFEST.MF")   =>
MergeStrategy.discard
  case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
  case "application.conf"=> MergeStrategy.concat
  case "unwanted.txt"=>
MergeStrategy.discard

  case x => val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)

}




On Fri, Aug 12, 2016 at 3:35 PM, janardhan shetty 
wrote:

> Is there a dataframe version of XGBoost in spark-ml ?.
> Has anyone used sparkxgboost package ?
>


Accessing SparkConfig from mapWithState function

2016-08-12 Thread Govindasamy, Nagarajan
Hi,


Is there a way to get access to SparkConfig from mapWithState function?  I am 
looking to implement logic using the config property in mapWithState function.


Thanks,


Raj


Why I can't use broadcast var defined in a global object?

2016-08-12 Thread yaochunnan
Hi all, 

Here is a simplified example to show my concern. This example contains 3
files with 3 objects, depending on spark 1.6.1.

//file globalObject.scala
import org.apache.spark.broadcast.Broadcast

object globalObject {
  var br_value: Broadcast[Map[Int, Double]] = null
}


//file someFunc.scala
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object someFunc {
  def go(rdd: RDD[Int])(implicit sc: SparkContext): Array[Int] = {
rdd.map(i => {
  val acc = globalObject.br_value.value
  if(acc.contains(i)) {
i + 1
  } else {
i
  }
}).take(100)
  }
}

//testMain.scala
import org.apache.spark.{SparkConf, SparkContext}

object testMain {
  def bootStrap()(implicit sc:SparkContext): Unit = {
globalObject.br_value = sc.broadcast(Map(1->2, 2->3, 4->5))
  }

  def main(args: Array[String]): Unit = {
lazy val appName = getClass.getSimpleName.split("\\$").last
implicit val sc = new SparkContext(new SparkConf().setAppName(appName))
val datardd = sc.parallelize(Range(0, 200), 200)
  .flatMap(i => Range(0, 1000))

bootStrap()
someFunc.go(datardd).foreach(println)

  }
}


When I run this code on cluster, it gives me the following error:
ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
at someFunc$$anonfun$go$1.apply$mcII$sp(someFunc.scala:7)
at someFunc$$anonfun$go$1.apply(someFunc.scala:6)
at someFunc$$anonfun$go$1.apply(someFunc.scala:6)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

Apparently, the data is not successfully broadcasted. I met this problem
when I was refactoring my code these days. I want different scala objects to
share a same broadcast variable. But now here it is. Pretty confusing now,
as to my understanding driver uses pointer to indicate broadcast variable.
Calling broadcast variable shouldn't be restricted to the same code scope. 

Correct me if I am wrong. And what's the proper way to share broadcast var
among scala objects ? Thanks in advance.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-I-can-t-use-broadcast-var-defined-in-a-global-object-tp27523.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Flattening XML in a DataFrame

2016-08-12 Thread Sreekanth Jella
Hi Folks,

 

I am trying flatten variety of XMLs using DataFrames. I'm using spark-xml
package which is automatically inferring my schema and creating a DataFrame.


 

I do not want to hard code any column names in DataFrame as I have lot of
varieties of XML documents and each might be lot more depth of child nodes.
I simply want to flatten any type of XML and then write output data to a
hive table. Can you please give some expert advice for the same.

 

Example XML and expected output is given below.

 

Sample XML:





   

   1

   foo



  

1

foo

  

  

1

foo

  



   





 

Expected output:

id, name, clerk.cid, clerk.cname

1, foo, 2, cname2

1, foo, 3, cname3

 

Thanks,

Sreekanth Jella

 



Re: dataframe row list question

2016-08-12 Thread ayan guha
You can use dot notations.

select myList.vList from t where myList.nm=IP'

On Fri, Aug 12, 2016 at 9:11 AM, vr spark  wrote:

> Hi Experts,
>  Please suggest
>
> On Thu, Aug 11, 2016 at 7:54 AM, vr spark  wrote:
>
>>
>> I have data which is json in this format
>>
>>  myList: array
>>  |||-- elem: struct
>>  ||||-- nm: string (nullable = true)
>>  ||||-- vList: array (nullable = true)
>>  |||||-- element: string (containsNull = true)
>>
>>
>>  from my kafka stream, i created a dataframe using sqlContext.jsonRDD
>>  Then registred it as registerTempTable
>>  selected mylist from this table and i see this output. It is a list of
>> rows
>>
>> [Row(nm=u'Apt', vList=[u'image']), Row(nm=u'Agent', vList=[u'Mozilla/5.0
>> ']), Row(nm=u'Ip', vList=[u'xx.yy.106.25'])]
>>
>>  My requirement is to get only rows with nm='IP' and its corresponding
>> value
>> I would need IP, xx.yy.106.25
>>
>>
>> Please suggest
>>
>
>


-- 
Best Regards,
Ayan Guha


restart spark streaming app

2016-08-12 Thread Shifeng Xiao
Hi folks,

I am using Spark streaming, and I am not clear if there is smart way to
restart the app once it fails, currently we just have one cron job to check
if the job is running every  2 or 5 minutes and restart the app when
necessary.

According to spark streaming guide:

   - *YARN* - Yarn supports a similar mechanism for automatically
   restarting an application. Please refer to YARN documentation for more
   details.


Not sure how to achieve that with YARN, anyone can help me on it?

Thanks


Using spark package XGBoost

2016-08-12 Thread janardhan shetty
Is there a dataframe version of XGBoost in spark-ml ?.
Has anyone used sparkxgboost package ?


Re: Spark 2.0.0 JaninoRuntimeException

2016-08-12 Thread dhruve ashar
I see a similar issue being resolved recently:
https://issues.apache.org/jira/browse/SPARK-15285

On Fri, Aug 12, 2016 at 3:33 PM, Aris  wrote:

> Hello folks,
>
> I'm on Spark 2.0.0 working with Datasets -- and despite the fact that
> smaller data unit tests work on my laptop, when I'm on a cluster, I get
> cryptic error messages:
>
> Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method
>> "(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/
>> apache/spark/sql/catalyst/InternalRow;)I" of class "org.apache.spark.sql.
>> catalyst.expressions.GeneratedClass$SpecificOrdering" grows beyond 64 KB
>>
>
> Unfortunately I'm not clear on how to even isolate the source of this
> problem. I didn't have this problem in Spark 1.6.1.
>
> Any clues?
>



-- 
-Dhruve Ashar


Re: Rebalancing when adding kafka partitions

2016-08-12 Thread Cody Koeninger
Hrrm, that's interesting. Did you try with subscribe pattern, out of
curiosity?

I haven't tested repartitioning on the  underlying new Kafka consumer, so
its possible I misunderstood something.
On Aug 12, 2016 2:47 PM, "Srikanth"  wrote:

> I did try a test with spark 2.0 + spark-streaming-kafka-0-10-assembly.
> Partition was increased using "bin/kafka-topics.sh --alter" after spark
> job was started.
> I don't see messages from new partitions in the DStream.
>
> KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
>> ssc, PreferConsistent, Subscribe[Array[Byte],
>> Array[Byte]](topics, kafkaParams) )
>> .map(r => (r.key(), r.value()))
>
>
> Also, no.of partitions did not increase too.
>
>> dataStream.foreachRDD( (rdd, curTime) => {
>>  logger.info(s"rdd has ${rdd.getNumPartitions} partitions.")
>
>
> Should I be setting some parameter/config? Is the doc for new integ
> available?
>
> Thanks,
> Srikanth
>
> On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger 
> wrote:
>
>> No, restarting from a checkpoint won't do it, you need to re-define the
>> stream.
>>
>> Here's the jira for the 0.10 integration
>>
>> https://issues.apache.org/jira/browse/SPARK-12177
>>
>> I haven't gotten docs completed yet, but there are examples at
>>
>> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10
>>
>> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth  wrote:
>> > In Spark 1.x, if we restart from a checkpoint, will it read from new
>> > partitions?
>> >
>> > If you can, pls point us to some doc/link that talks about Kafka 0.10
>> integ
>> > in Spark 2.0.
>> >
>> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger 
>> wrote:
>> >>
>> >> For the integration for kafka 0.8, you are literally starting a
>> >> streaming job against a fixed set of topicapartitions,  It will not
>> >> change throughout the job, so you'll need to restart the spark job if
>> >> you change kafka partitions.
>> >>
>> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe
>> >> or subscribepattern, it should pick up new partitions as they are
>> >> added.
>> >>
>> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth 
>> wrote:
>> >> > Hello,
>> >> >
>> >> > I'd like to understand how Spark Streaming(direct) would handle Kafka
>> >> > partition addition?
>> >> > Will a running job be aware of new partitions and read from it?
>> >> > Since it uses Kafka APIs to query offsets and offsets are handled
>> >> > internally.
>> >> >
>> >> > Srikanth
>> >
>> >
>>
>
>


Spark 2.0.0 JaninoRuntimeException

2016-08-12 Thread Aris
Hello folks,

I'm on Spark 2.0.0 working with Datasets -- and despite the fact that
smaller data unit tests work on my laptop, when I'm on a cluster, I get
cryptic error messages:

Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method
> "(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I"
> of class
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering"
> grows beyond 64 KB
>

Unfortunately I'm not clear on how to even isolate the source of this
problem. I didn't have this problem in Spark 1.6.1.

Any clues?


Re: Rebalancing when adding kafka partitions

2016-08-12 Thread Srikanth
I did try a test with spark 2.0 + spark-streaming-kafka-0-10-assembly.
Partition was increased using "bin/kafka-topics.sh --alter" after spark job
was started.
I don't see messages from new partitions in the DStream.

KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
> ssc, PreferConsistent, Subscribe[Array[Byte], Array[Byte]](topics,
> kafkaParams) )
> .map(r => (r.key(), r.value()))


Also, no.of partitions did not increase too.

> dataStream.foreachRDD( (rdd, curTime) => {
>  logger.info(s"rdd has ${rdd.getNumPartitions} partitions.")


Should I be setting some parameter/config? Is the doc for new integ
available?

Thanks,
Srikanth

On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger  wrote:

> No, restarting from a checkpoint won't do it, you need to re-define the
> stream.
>
> Here's the jira for the 0.10 integration
>
> https://issues.apache.org/jira/browse/SPARK-12177
>
> I haven't gotten docs completed yet, but there are examples at
>
> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10
>
> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth  wrote:
> > In Spark 1.x, if we restart from a checkpoint, will it read from new
> > partitions?
> >
> > If you can, pls point us to some doc/link that talks about Kafka 0.10
> integ
> > in Spark 2.0.
> >
> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger 
> wrote:
> >>
> >> For the integration for kafka 0.8, you are literally starting a
> >> streaming job against a fixed set of topicapartitions,  It will not
> >> change throughout the job, so you'll need to restart the spark job if
> >> you change kafka partitions.
> >>
> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe
> >> or subscribepattern, it should pick up new partitions as they are
> >> added.
> >>
> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth 
> wrote:
> >> > Hello,
> >> >
> >> > I'd like to understand how Spark Streaming(direct) would handle Kafka
> >> > partition addition?
> >> > Will a running job be aware of new partitions and read from it?
> >> > Since it uses Kafka APIs to query offsets and offsets are handled
> >> > internally.
> >> >
> >> > Srikanth
> >
> >
>


Re: Grid Search using Spark MLLib Pipelines

2016-08-12 Thread Adamantios Corais

Great.

I like your second solution. But how can I make sure that cvModel holds 
the best model overall (as opposed to the last one that was tired out 
but the grid search)?


In addition, do you have an idea how to collect the average error of 
each grid search (here 1x1x1)?




On 12/08/2016 08:59 μμ, Bryan Cutler wrote:
You will need to cast bestModel to include the MLWritable trait.  The 
class Model does not mix it in by default.  For instance:


cvModel.bestModel.asInstanceOf[MLWritable].save("/my/path")

Alternatively, you could save the CV model directly, which takes care 
of this


cvModel.save("/my/path")

On Fri, Aug 12, 2016 at 9:17 AM, Adamantios Corais 
> wrote:


Hi,

Assuming that I have run the following pipeline and have got the
best logistic regression model. How can I then save that model for
later use? The following command throws an error:

cvModel.bestModel.save("/my/path")

Also, is it possible to get the error (a collection of) for each
combination of parameters?

I am using spark 1.6.2

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml
.classification.LogisticRegression
import org.apache.spark.ml
.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder , CrossValidator}

val lr = new LogisticRegression()

val pipeline = new Pipeline().
setStages(Array(lr))

val paramGrid = new ParamGridBuilder().
addGrid(lr.elasticNetParam , Array(0.1)).
addGrid(lr.maxIter , Array(10)).
addGrid(lr.regParam , Array(0.1)).
build()

val cv = new CrossValidator().
setEstimator(pipeline).
setEvaluator(new BinaryClassificationEvaluator).
setEstimatorParamMaps(paramGrid).
setNumFolds(2)

val cvModel = cv.
fit(training)


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org







Re: countDistinct, partial aggregates and Spark 2.0

2016-08-12 Thread Lee Becker
On Fri, Aug 12, 2016 at 11:55 AM, Lee Becker  wrote:

> val df = sc.parallelize(Array(("a", "a"), ("b", "c"), ("c",
> "a"))).toDF("x", "y")
> val grouped = df.groupBy($"x").agg(countDistinct($"y"), collect_set($"y"))
>

This workaround executes with no exceptions:
val grouped = df.groupBy($"x").agg(size(collect_set($"y")),
collect_set($"y"))

In this example countDistinct and collect_set are running on the same
column and thus the result of countDistinct is essentially redundant.
Assuming they were running on different columns (say there was column 'z'
too), is there anything distinct computationally between countDistinct and
size(collect_set(...))?

-- 
*hapara* ● Making Learning Visible
1877 Broadway Street, Boulder, CO 80302
(Google Voice): +1 720 335 5332
www.hapara.com   Twitter: @hapara_team 


Re: KafkaUtils.createStream not picking smallest offset

2016-08-12 Thread Cody Koeninger
Are you checkpointing?


Beyond that, why are you using createStream instead of createDirectStream

On Fri, Aug 12, 2016 at 12:32 PM, Diwakar Dhanuskodi
 wrote:
> Okay .
> I could  delete  the  consumer group in zookeeper and  start  again to  re
> use same consumer group name. But  this  is  not  working  though . Somehow
> createstream  is  picking  the  offset from  some where other than
> /consumers/ from  zookeeper
>
>
> Sent from Samsung Mobile.
>
>
>
>
>
>
>
>
>  Original message 
> From: Cody Koeninger 
> Date:12/08/2016 18:02 (GMT+05:30)
> To: Diwakar Dhanuskodi 
> Cc:
> Subject: Re: KafkaUtils.createStream not picking smallest offset
>
> Auto offset reset only applies if there aren't offsets available otherwise.
>
> The old high level consumer stores offsets in zookeeper.
>
> If you want to make sure you're starting clean, use a new consumer group
> I'd.
>
> On Aug 12, 2016 3:35 AM, "Diwakar Dhanuskodi" 
> wrote:
>>
>>
>> Hi,
>> We are  using  spark  1.6.1 and  kafka 0.9.
>>
>> KafkaUtils.createStream is  showing strange behaviour. Though
>> auto.offset.reset is  set  to  smallest .  Whenever we  need  to  restart
>> the  stream it  is  picking up  the  latest  offset which  is not  expected.
>> Do  we  need  to  set  any  other  properties ?.
>>
>> createDirectStream works fine  in  this  above  case.
>>
>>
>> Sent from Samsung Mobile.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Grid Search using Spark MLLib Pipelines

2016-08-12 Thread Bryan Cutler
You will need to cast bestModel to include the MLWritable trait.  The class
Model does not mix it in by default.  For instance:

cvModel.bestModel.asInstanceOf[MLWritable].save("/my/path")

Alternatively, you could save the CV model directly, which takes care of
this

cvModel.save("/my/path")

On Fri, Aug 12, 2016 at 9:17 AM, Adamantios Corais <
adamantios.cor...@gmail.com> wrote:

> Hi,
>
> Assuming that I have run the following pipeline and have got the best
> logistic regression model. How can I then save that model for later use?
> The following command throws an error:
>
> cvModel.bestModel.save("/my/path")
>
> Also, is it possible to get the error (a collection of) for each
> combination of parameters?
>
> I am using spark 1.6.2
>
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.classification.LogisticRegression
> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
> import org.apache.spark.ml.tuning.{ParamGridBuilder , CrossValidator}
>
> val lr = new LogisticRegression()
>
> val pipeline = new Pipeline().
> setStages(Array(lr))
>
> val paramGrid = new ParamGridBuilder().
> addGrid(lr.elasticNetParam , Array(0.1)).
> addGrid(lr.maxIter , Array(10)).
> addGrid(lr.regParam , Array(0.1)).
> build()
>
> val cv = new CrossValidator().
> setEstimator(pipeline).
> setEvaluator(new BinaryClassificationEvaluator).
> setEstimatorParamMaps(paramGrid).
> setNumFolds(2)
>
> val cvModel = cv.
> fit(training)
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


countDistinct, partial aggregates and Spark 2.0

2016-08-12 Thread Lee Becker
Hi everyone,

I've started experimenting with my codebase to see how much work I will
need to port it from 1.6.1 to 2.0.0.  In regressing some of my dataframe
transforms, I've discovered I can no longer pair a countDistinct with a
collect_set in the same aggregation.

Consider:

val df = sc.parallelize(Array(("a", "a"), ("b", "c"), ("c",
"a"))).toDF("x", "y")
val grouped = df.groupBy($"x").agg(countDistinct($"y"), collect_set($"y"))

When it comes time to execute (via collect or show).  I get the following
error:

*java.lang.RuntimeException: Distinct columns cannot exist in Aggregate
> operator containing aggregate functions which don't support partial
> aggregation.*


I never encountered this behavior in previous Spark versions.  Are there
workarounds that don't require computing each aggregation separately and
joining later?  Is there a partial aggregation version of collect_set?

Thanks,
Lee


Unable to run spark examples in eclipse

2016-08-12 Thread subash basnet
Hello all,

I am completely new to spark. I downloaded the spark project from github (
https://github.com/apache/spark) and wanted to run the examples. I
successfully ran the maven command:

build/mvn -DskipTests clean package

But I am not able to build the spark-examples_2.11 project. There are
import issues in the java files like below:
Eg: The import org.apache.spark.SparkConf cannot be resolved
The import org.apache.spark.api.java.JavaSparkContext cannot be resolved
and so on displayed in eclipse.

What am I doing wrong to run these examples?


Best Regards,
Subash Basnet


Mailing list

2016-08-12 Thread Inam Ur Rehman
UNSUBSCRIBE


How to add custom steps to Pipeline models?

2016-08-12 Thread evanzamir
I'm building an LDA Pipeline, currently with 4 steps, Tokenizer,
StopWordsRemover, CountVectorizer, and LDA. I would like to add more steps,
for example, stemming and lemmatization, and also 1-gram and 2-grams (which
I believe is not supported by the default NGram class). Is there a way to
add these steps? In sklearn, you can create classes with fit() and
transform() methods, and that should be enough. Is that true in Spark ML as
well (or something similar)? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-custom-steps-to-Pipeline-models-tp27522.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Grid Search using Spark MLLib Pipelines

2016-08-12 Thread Adamantios Corais

Hi,

Assuming that I have run the following pipeline and have got the best logistic 
regression model. How can I then save that model for later use? The following 
command throws an error:

cvModel.bestModel.save("/my/path")

Also, is it possible to get the error (a collection of) for each combination of 
parameters?

I am using spark 1.6.2

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder , CrossValidator}

val lr = new LogisticRegression()

val pipeline = new Pipeline().
setStages(Array(lr))

val paramGrid = new ParamGridBuilder().
addGrid(lr.elasticNetParam , Array(0.1)).
addGrid(lr.maxIter , Array(10)).
addGrid(lr.regParam , Array(0.1)).
build()

val cv = new CrossValidator().
setEstimator(pipeline).
setEvaluator(new BinaryClassificationEvaluator).
setEstimatorParamMaps(paramGrid).
setNumFolds(2)

val cvModel = cv.
fit(training)


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



PySpark read from HBase

2016-08-12 Thread Bin Wang
Hi there,

I have lots of raw data in several Hive tables where we built a workflow to
"join" those records together and restructured into HBase. It was done
using plain MapReduce to generate HFile, and then load incremental from
HFile into HBase to guarantee the best performance.

However, we need to do some time series analysis for each of the record in
HBase, but the implementation was done in Python (pandas, scikit learn)
which is pretty time-consuming to reproduce in Java, Scala.

I am thinking PySpark is probably the best approach if it works.
Can pyspark read from HFile directory? or can it read from HBase in
parallel?
I don't see that many examples out there so any help or guidance will be
appreciated.

Also, we are using Cloudera Hadoop so there might be a slight delay with
the latest Spark release.

Best regards,

Bin


[Spark 2.0] spark.sql.hive.metastore.jars doesn't work

2016-08-12 Thread Yan Facai
Hi, everyone.

According the official guide, I copied hdfs-site.xml, core-site.xml and
hive-site.xml to $SPARK_HOME/conf, and write code as below:

```Java
SparkSession spark = SparkSession
.builder()
.appName("Test Hive for Spark")
.config("spark.sql.hive.metastore.version", "0.13.1")
.config("spark.sql.hive.metastore.jars",
"/data0/facai/lib/hive-0.13.1/lib:/data0/facai/lib/hadoop-2.
4.1/share/hadoop")
.enableHiveSupport()
.getOrCreate();
```


When I use spark-submit to submit tasks to yarn, and run as **client**
mode,  ClassNotFoundException is thrown,  and some details of logs are list
below:
```
16/08/12 17:07:28 INFO execution.SparkSqlParser: Parsing command: SHOW
TABLES
16/08/12 17:07:30 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint:
Registered executor NettyRpcEndpointRef(null) (10.77.113.154:52806) with ID
1
16/08/12 17:07:31 INFO storage.BlockManagerMasterEndpoint: Registering
block manager h113154.mars.grid.sina.com.cn:44756 with 912.3 MB RAM,
BlockManagerId(1, h113154.mars.grid.sina.com.cn, 44756)
16/08/12 17:07:32 INFO hive.HiveUtils: Initializing HiveMetastoreConnection
version 0.13.1 using file:/data0/facai/lib/hive-0.1
3.1/lib:file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
16/08/12 17:07:32 ERROR yarn.ApplicationMaster: User class threw exception:
java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError:
org/apache/hadoop/hive/ql/session/SessionState when creating Hive client
using classpath: file:/data0/facai/lib/hive-0.13.1/lib,
file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
```

However, all the jars needed by hive is indeed in the dir:
```Bash
[hadoop@h107713699 spark_test]$ ls /data0/facai/lib/hive-0.13.1/lib/ | grep
hive
hive-ant-0.13.1.jar
hive-beeline-0.13.1.jar
hive-cli-0.13.1.jar
hive-common-0.13.1.jar
hive-contrib-0.13.1.jar
hive-exec-0.13.1.jar
hive-hbase-handler-0.13.1.jar
hive-hwi-0.13.1.jar
hive-jdbc-0.13.1.jar
hive-metastore-0.13.1.jar
hive-serde-0.13.1.jar
hive-service-0.13.1.jar
hive-shims-0.13.1.jar
hive-shims-0.20-0.13.1.jar
hive-shims-0.20S-0.13.1.jar
hive-shims-0.23-0.13.1.jar
hive-shims-common-0.13.1.jar
hive-shims-common-secure-0.13.1.jar
hive-testutils-0.13.1.jar
```

So,
I wonder why spark cannot find the jars needed?

Any help will be appreciated, thanks.


RE: Spark join and large temp files

2016-08-12 Thread Ashic Mahtab
Hi Gourav,Thanks for your input. As mentioned previously, we've tried the 
broadcast. We've failed to broadcast 1.5GB...perhaps some tuning can help. We 
see CPU go up to 100%, and then workers die during the broadcast. I'm not sure 
if it's a good idea to broadcast that much, as spark's broadcast hint be 
default uses a threshold of just 10MB to decide whether to broadcast or not.
As for redis, we're not needing a seperate redis cluster or anything. We're 
using embedded redis on the driver that lives for the duration of the job. It's 
essentially a way to have some memory on the driver that can accomodate 1.5GB 
and allows access over the network. https://github.com/kstyrc/embedded-redis 
makes this trivial to do.
I don't know if this a 2011 way of solving this problem or not, but 
http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs
 seems to suggest that a good approach to joining a huge dataset with one that 
can't be made smaller is using a database. We've gone by that, and it seems to 
be working. We've tried all the other recommendations (broadcast the dataframe 
as part of the join, explicitly broadcast a hashmap from the driver, register 
temp tables, etc.) - and nothing else has worked. The parquet dataframe doesn't 
have a partitioner when loaded, and any sort of operation requiring a network 
shuffle causes temp disk fill up. Within these constraints, the database 
approach turned out to be the only thing we could get working (without paying 
double / treble for nodes that have more disk space to hold the temp files).
Regards,Ashic.

From: gourav.sengu...@gmail.com
Date: Thu, 11 Aug 2016 21:52:07 +0100
Subject: Re: Spark join and large temp files
To: bteeu...@gmail.com
CC: user@spark.apache.org

The point is that if you have skewed data then one single reducer will finally 
take a very long time, and you do not need to try this even, just search in 
Google and skewed data is a known problem in joins even in SPARK.
Therefore instead of using join, in case the used case permits, just write a 
UDF, which then works as a look up. Using broadcast is the SPARK way, and 
someone mentioned here the use of Redis, which I remember used to be the way 
around in 2011 in the initial days of MR.
Regards,Gourav
On Thu, Aug 11, 2016 at 9:24 PM, Ben Teeuwen  wrote:
Hmm, hashing will probably send all of the records with the same key to the 
same partition / machine.I’d try it out, and hope that if you have a few 
superlarge keys bigger than the RAM available of one node, they spill to disk. 
Maybe play with persist() and using a different Storage Level.
On Aug 11, 2016, at 9:48 PM, Gourav Sengupta  wrote:
Hi Ben,
and that will take care of skewed data?
Gourav 
On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen  wrote:
When you read both ‘a’ and ‘b', can you try repartitioning both by column ‘id’? 
If you .cache() and .count() to force a shuffle, it'll push the records that 
will be joined to the same executors. 
So;a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()a.count()
b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()b.count()
And then join..

On Aug 8, 2016, at 8:17 PM, Ashic Mahtab  wrote:
Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 




  

Spark's Logistic Regression runs unstable on Yarn cluster

2016-08-12 Thread olivierjeunen
I'm using pyspark ML's logistic regression implementation to do some
classification on an AWS EMR Yarn cluster.

The cluster consists of 10 m3.xlarge nodes and is set up as follows:
spark.driver.memory 10g, spark.driver.cores  3 , spark.executor.memory 10g,
spark.executor-cores 4.

I enabled yarn's dynamic allocation abilities.

The problem is that my results are way unstable. Sometimes my application
finishes using 13 executors total, sometimes all of them seem to die and the
application ends up using anywhere between 100 and 200...

Any insight on what could cause this stochastic behaviour would be greatly
appreciated.

The code used to run the logistic regression:

data = spark.read.parquet(storage_path).repartition(80)
lr = LogisticRegression()
lr.setMaxIter(50)
lr.setRegParam(0.063)
evaluator = BinaryClassificationEvaluator()
lrModel = lr.fit(data.filter(data.test == 0))
predictions = lrModel.transform(data.filter(data.test == 1))
auROC = evaluator.evaluate(predictions)
print "auROC on test set: ", auROC
Data is a dataframe of roughly 2.8GB



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-s-Logistic-Regression-runs-unstable-on-Yarn-cluster-tp27520.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark-2.0.0 fails reading a parquet dataset generated by Spark-1.6.2

2016-08-12 Thread Cheng Lian

OK, I've merged this PR to master and branch-2.0.


On 8/11/16 8:27 AM, Cheng Lian wrote:
Haven't figured out the exactly way how it failed, but the leading 
underscore in the partition directory name looks suspicious. Could you 
please try this PR to see whether it fixes the issue: 
https://github.com/apache/spark/pull/14585/files


Cheng


On 8/9/16 5:38 PM, immerrr again wrote:

Another follow-up: I have narrowed it down to the first 32 partitions,
but from that point it gets strange.

Here's the error:

In [68]: spark.read.parquet(*subdirs[:32])
...
AnalysisException: u'Unable to infer schema for ParquetFormat at
/path/to/data/_locality_code=AQ,/path/to/data/_locality_code=AI. It
must be specified manually;'


Removing *any* of the subdirs from that set removes the error.

In [69]: for i in range(32): spark.read.parquet(*(subdirs[:i] +
subdirs[i+1:32]))


Here's the punchline: schemas for the first 31 and for the last 31 of
those 32 subdirs are the same:

In [70]: spark.read.parquet(*subdirs[:31]).schema.jsonValue() ==
spark.read.parquet(*subdirs[1:32]).schema.jsonValue()
Out[70]: True

Any idea why that might be happening?

On Tue, Aug 9, 2016 at 12:12 PM, immerrr again  
wrote:

Some follow-up information:

- dataset size is ~150G

- the data is partitioned by one of the columns, _locality_code:
$ ls -1
_locality_code=AD
_locality_code=AE
_locality_code=AF
_locality_code=AG
_locality_code=AI
_locality_code=AL
_locality_code=AM
_locality_code=AN

_locality_code=YE
_locality_code=YT
_locality_code=YU
_locality_code=ZA
_locality_code=ZM
_locality_code=ZW
_SUCCESS

- some of the partitions contain only one row, but all partitions are
in place (ie number of directories matches number of distinct
localities
val counts = 
sqlContext.read.parquet("/path-to-data").groupBy("_locality_code").count().orderBy($"count").collect()


scala> counts.slice(counts.length-10, counts.length)
res13: Array[org.apache.spark.sql.Row] = Array([CN,5682255],
[AU,6090561], [ES,6184507], [IT,7093401], [FR,8814435], [CA,10005467],
[UK,15375397], [BR,15829260], [IN,22404143], [US,98585175])

scala> counts.slice(0, 10)
res14: Array[org.apache.spark.sql.Row] = Array([UM,1], [JB,1], [JK,1],
[WP,1], [JT,1], [SX,9], [BL,52], [BQ,70], [BV,115], [MF,115])


On Tue, Aug 9, 2016 at 11:10 AM, immerrr again  
wrote:

Hi everyone

I tried upgrading Spark-1.6.2 to Spark-2.0.0 but run into an issue
reading the existing data. Here's how the traceback looks in
spark-shell:

scala> spark.read.parquet("/path/to/data")
org.apache.spark.sql.AnalysisException: Unable to infer schema for
ParquetFormat at /path/to/data. It must be specified manually;
   at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$16.apply(DataSource.scala:397)
   at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$16.apply(DataSource.scala:397)

   at scala.Option.getOrElse(Option.scala:121)
   at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:396)
   at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
   at 
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:427) 

   at 
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:411) 


   ... 48 elided

If I enable DEBUG log with sc.setLogLevel("DEBUG"), here's what I
additionally see in the output:
https://gist.github.com/immerrr/4474021ae70f35b7b9e262251c0abc59. Of
course, that same data is read and processed by spark-1.6.2 correctly.

Any idea what might be wrong here?

Cheers,
immerrr

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org







-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark join and large temp files

2016-08-12 Thread Gourav Sengupta
The point is that if you have skewed data then one single reducer will
finally take a very long time, and you do not need to try this even, just
search in Google and skewed data is a known problem in joins even in SPARK.

Therefore instead of using join, in case the used case permits, just write
a UDF, which then works as a look up. Using broadcast is the SPARK way, and
someone mentioned here the use of Redis, which I remember used to be the
way around in 2011 in the initial days of MR.

Regards,
Gourav

On Thu, Aug 11, 2016 at 9:24 PM, Ben Teeuwen  wrote:

> Hmm, hashing will probably send all of the records with the same key to
> the same partition / machine.
> I’d try it out, and hope that if you have a few superlarge keys bigger
> than the RAM available of one node, they spill to disk. Maybe play with
> persist() and using a different Storage Level.
>
> On Aug 11, 2016, at 9:48 PM, Gourav Sengupta 
> wrote:
>
> Hi Ben,
>
> and that will take care of skewed data?
>
> Gourav
>
> On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen  wrote:
>
>> When you read both ‘a’ and ‘b', can you try repartitioning both by column
>> ‘id’?
>> If you .cache() and .count() to force a shuffle, it'll push the records
>> that will be joined to the same executors.
>>
>> So;
>> a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()
>> a.count()
>>
>> b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()
>> b.count()
>>
>> And then join..
>>
>>
>> On Aug 8, 2016, at 8:17 PM, Ashic Mahtab  wrote:
>>
>> Hello,
>> We have two parquet inputs of the following form:
>>
>> a: id:String, Name:String  (1.5TB)
>> b: id:String, Number:Int  (1.3GB)
>>
>> We need to join these two to get (id, Number, Name). We've tried two
>> approaches:
>>
>> a.join(b, Seq("id"), "right_outer")
>>
>> where a and b are dataframes. We also tried taking the rdds, mapping them
>> to pair rdds with id as the key, and then joining. What we're seeing is
>> that temp file usage is increasing on the join stage, and filling up our
>> disks, causing the job to crash. Is there a way to join these two data sets
>> without well...crashing?
>>
>> Note, the ids are unique, and there's a one to one mapping between the
>> two datasets.
>>
>> Any help would be appreciated.
>>
>> -Ashic.
>>
>>
>>
>
>


Re: Log messages for shuffle phase

2016-08-12 Thread Jacek Laskowski
Hi,

Have you looked at web UI? You should find such task metrics.

Jacek

On 11 Aug 2016 6:28 p.m., "Suman Somasundar" 
wrote:

> Hi,
>
>
>
> While going through the logs of an application, I noticed that I could not
> find any logs to dig deeper into any of the shuffle phases.
>
>
>
> I am interested in finding out time taken by each shuffle phase, the size
> of data spilled to disk if any, among other things.
>
>
>
> Does anyone know how I can do this?
>
>
>
> Thanks,
>
> Suman.
>


Re: Losing executors due to memory problems

2016-08-12 Thread Bedrytski Aliaksandr
Hi Vinay,

just out of curiosity, why are you converting your Dataframes into RDDs
before the join? Join works quite well with Dataframes.

As for your problem, it looks like you gave to your executors more
memory than you physically have. As an example of executors
configuration:

> Cluster of 6 nodes, 16 cores/node, 64 ram/node => Gives: 17 executors,
> 19Gb/exec, 5 cores/exec
> No more than 5 cores per exec
> Leave some cores/Ram for the driver

More on the matter here
http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications

--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Fri, Aug 12, 2016, at 01:41, Muttineni, Vinay wrote:
> Hello,
> I have a spark job that basically reads data from two tables into two
> Dataframes which are subsequently converted to RDD's. I, then, join
> them based on a common key.
> Each table is about 10 TB in size but after filtering, the two RDD’s
> are about 500GB each.
> I have 800 executors with 8GB memory per executor.
> Everything works fine until the join stage. But, the join stage is
> throwing the below error.
> I tried increasing the partitions before the join stage but it doesn’t
> change anything.
> Any ideas, how I can fix this and what I might be doing wrong?
> Thanks,
> Vinay
>
> ExecutorLostFailure (executor 208 exited caused by one of the running
> tasks) Reason: Container marked as failed:
> container_1469773002212_96618_01_000246 on host:. Exit status: 143.
> Diagnostics: Container
> [pid=31872,containerID=container_1469773002212_96618_01_000246] is
> running beyond physical memory limits. Current usage: 15.2 GB of 15.1
> GB physical memory used; 15.9 GB of 31.8 GB virtual memory used.
> Killing container.
> Dump of the process-tree for container_1469773002212_96618_01_000246 :
>  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>  |  SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
>  |  FULL_CMD_LINE
>  |- 31883 31872 31872 31872 (java) 519517 41888 17040175104
>  |  3987193 /usr/java/latest/bin/java -server -
>  |  XX:OnOutOfMemoryError=kill %p -Xms14336m -Xmx14336m -
>  |  Djava.io.tmpdir=/hadoop/11/scratch/local/usercacheappcach-
>  |  e/application_1469773002212_96618/container_1469773002212-
>  |  _96618_01_000246/tmp -Dspark.driver.port=32988 -
>  |  Dspark.ui.port=0 -Dspark.akka.frameSize=256 -
>  |  Dspark.yarn.app.container.log.dir=/hadoop/12/scratch/logs-
>  |  /application_1469773002212_96618/container_1469773002212_-
>  |  96618_01_000246 -XX:MaxPermSize=256m
>  |  org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-
>  |  url spark://CoarseGrainedScheduler@10.12.7.4:32988 --executor-
>  |  id 208 –hostname x.com --cores 11 --app-id
>  |  application_1469773002212_96618 --user-class-path
>  |  file:/hadoop/11/scratch/local/usercache /appcache/applica-
>  |  tion_1469773002212_96618/container_1469773002212_96618_01-
>  |  _000246/__app__.jar --user-class-path
>  |  file:/hadoop/11/scratch/local/usercache/ appcache/applica-
>  |  tion_1469773002212_96618/container_1469773002212_96618_01-
>  |  _000246/mysql-connector-java-5.0.8-bin.jar --user-class-
>  |  path file:/hadoop/11/scratch/local/usercache/appcache/app-
>  |  lication_1469773002212_96618/container_1469773002212_9661-
>  |  8_01_000246/datanucleus-core-3.2.10.jar --user-class-path
>  |  file:/hadoop/11/scratch/local/usercache/appcache/applicat-
>  |  ion_1469773002212_96618/container_1469773002212_96618_01_-
>  |  000246/datanucleus-api-jdo-3.2.6.jar --user-class-path fi-
>  |  le:/hadoop/11/scratch/local/usercache/appcache/applicatio-
>  |  n_1469773002212_96618/container_1469773002212_96618_01_00-
>  |  0246/datanucleus-rdbms-3.2.9.jar
>  |- 31872 16580 31872 31872 (bash) 0 0 9146368 267 /bin/bash
>  |  -c LD_LIBRARY_PATH=/apache/hadoop/lib/native:/apache/hado-
>  |  op/lib/native/Linux-amd64-64: /usr/java/latest/bin/java
>  |  -server -XX:OnOutOfMemoryError='kill %p' -Xms14336m -
>  |  Xmx14336m -
>  |  Djava.io.tmpdir=/hadoop/11/scratch/local/usercache/ appca-
>  |  che/application_1469773002212_96618/container_14697730022-
>  |  12_96618_01_000246/tmp '-Dspark.driver.port=32988' '-
>  |  Dspark.ui.port=0' '-Dspark.akka.frameSize=256' -
>  |  Dspark.yarn.app.container.log.dir=/hadoop/12/scratch/logs-
>  |  /application_1469773002212_96618/container_1469773002212_-
>  |  96618_01_000246 -XX:MaxPermSize=256m
>  |  org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-
>  |  url spark://CoarseGrainedScheduler@1.4.1.6:32988 --executor-
>  |  id 208 --hostname x.com --cores 11 --app-id
>  |  application_1469773002212_96618 

Re: Losing executors due to memory problems

2016-08-12 Thread Koert Kuipers
you could have a very large key? perhaps a token value?

i love the rdd api but have found that for joins dataframe/dataset performs
better. maybe can you do the joins in that?

On Thu, Aug 11, 2016 at 7:41 PM, Muttineni, Vinay 
wrote:

> Hello,
>
> I have a spark job that basically reads data from two tables into two
> Dataframes which are subsequently converted to RDD's. I, then, join them
> based on a common key.
>
> Each table is about 10 TB in size but after filtering, the two RDD’s are
> about 500GB each.
>
> I have 800 executors with 8GB memory per executor.
>
> Everything works fine until the join stage. But, the join stage is
> throwing the below error.
>
> I tried increasing the partitions before the join stage but it doesn’t
> change anything.
>
> Any ideas, how I can fix this and what I might be doing wrong?
>
> Thanks,
>
> Vinay
>
>
>
> ExecutorLostFailure (executor 208 exited caused by one of the running
> tasks) Reason: Container marked as failed: 
> container_1469773002212_96618_01_000246
> on host:. Exit status: 143. Diagnostics: Container [pid=31872,containerID=
> container_1469773002212_96618_01_000246] is running beyond physical
> memory limits. Current usage: 15.2 GB of 15.1 GB physical memory used; 15.9
> GB of 31.8 GB virtual memory used. Killing container.
>
> Dump of the process-tree for container_1469773002212_96618_01_000246 :
>
>  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>
>  |- 31883 31872 31872 31872 (java) 519517 41888 17040175104
> 3987193 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p
> -Xms14336m -Xmx14336m -Djava.io.tmpdir=/hadoop/11/scratch/local/
> usercacheappcache/application_1469773002212_96618/container_
> 1469773002212_96618_01_000246/tmp -Dspark.driver.port=32988
> -Dspark.ui.port=0 -Dspark.akka.frameSize=256 -Dspark.yarn.app.container.
> log.dir=/hadoop/12/scratch/logs/application_1469773002212_96618/container_1469773002212_96618_01_000246
> -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend
> --driver-url spark://CoarseGrainedScheduler@10.12.7.4:32988 --executor-id
> 208 –hostname x.com --cores 11 --app-id application_1469773002212_96618
> --user-class-path file:/hadoop/11/scratch/local/usercache
> /appcache/application_1469773002212_96618/container_
> 1469773002212_96618_01_000246/__app__.jar --user-class-path
> file:/hadoop/11/scratch/local/usercache/ appcache/application_
> 1469773002212_96618/container_1469773002212_96618_01_000246/
> mysql-connector-java-5.0.8-bin.jar --user-class-path
> file:/hadoop/11/scratch/local/usercache/appcache/
> application_1469773002212_96618/container_1469773002212_
> 96618_01_000246/datanucleus-core-3.2.10.jar --user-class-path
> file:/hadoop/11/scratch/local/usercache/appcache/
> application_1469773002212_96618/container_1469773002212_
> 96618_01_000246/datanucleus-api-jdo-3.2.6.jar --user-class-path
> file:/hadoop/11/scratch/local/usercache/appcache/
> application_1469773002212_96618/container_1469773002212_
> 96618_01_000246/datanucleus-rdbms-3.2.9.jar
>
>  |- 31872 16580 31872 31872 (bash) 0 0 9146368 267 /bin/bash -c
> LD_LIBRARY_PATH=/apache/hadoop/lib/native:/apache/
> hadoop/lib/native/Linux-amd64-64: /usr/java/latest/bin/java -server
> -XX:OnOutOfMemoryError='kill %p' -Xms14336m -Xmx14336m
> -Djava.io.tmpdir=/hadoop/11/scratch/local/usercache/ appcache/application_
> 1469773002212_96618/container_1469773002212_96618_01_000246/tmp
> '-Dspark.driver.port=32988' '-Dspark.ui.port=0'
> '-Dspark.akka.frameSize=256' -Dspark.yarn.app.container.
> log.dir=/hadoop/12/scratch/logs/application_1469773002212_96618/container_1469773002212_96618_01_000246
> -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend
> --driver-url spark://CoarseGrainedScheduler@1.4.1.6:32988 --executor-id
> 208 --hostname x.com --cores 11 --app-id application_1469773002212_96618
> --user-class-path file:/hadoop/11/scratch/local/usercache/
> appcache/application_1469773002212_96618/container_
> 1469773002212_96618_01_000246/__app__.jar --user-class-path
> file:/hadoop/11/scratch/local/usercache/appcache/
> application_1469773002212_96618/container_1469773002212_
> 96618_01_000246/mysql-connector-java-5.0.8-bin.jar --user-class-path
> file:/hadoop/11/scratch/local/usercache/appcache/
> application_1469773002212_96618/container_1469773002212_
> 96618_01_000246/datanucleus-core-3.2.10.jar --user-class-path
> file:/hadoop/11/scratch/local/usercache/appcache/
> application_1469773002212_96618/container_1469773002212_
> 96618_01_000246/datanucleus-api-jdo-3.2.6.jar --user-class-path
> file:/hadoop/11/scratch/local/usercache/appcache/
> application_1469773002212_96618/container_1469773002212_
> 96618_01_000246/datanucleus-rdbms-3.2.9.jar 1> /hadoop/12/scratch/logs/
> application_1469773002212_96618/container_1469773002212_96618_01_000246/stdout
> 2> 

type inference csv dates

2016-08-12 Thread Koert Kuipers
i generally like the type inference feature of the spark-sql csv
datasource, however i have been stung several times by date inference. the
problem is that when a column is converted to a date type the original data
is lost. this is not a lossless conversion. and i often have a requirement
where i need to preserve the date formatting when writing back out.

so is there a way to disable date inference all-together and treat dates as
string?

thanks! koert


Re: dataframe row list question

2016-08-12 Thread vr spark
Hi Experts,
 Please suggest

On Thu, Aug 11, 2016 at 7:54 AM, vr spark  wrote:

>
> I have data which is json in this format
>
>  myList: array
>  |||-- elem: struct
>  ||||-- nm: string (nullable = true)
>  ||||-- vList: array (nullable = true)
>  |||||-- element: string (containsNull = true)
>
>
>  from my kafka stream, i created a dataframe using sqlContext.jsonRDD
>  Then registred it as registerTempTable
>  selected mylist from this table and i see this output. It is a list of
> rows
>
> [Row(nm=u'Apt', vList=[u'image']), Row(nm=u'Agent', vList=[u'Mozilla/5.0
> ']), Row(nm=u'Ip', vList=[u'xx.yy.106.25'])]
>
>  My requirement is to get only rows with nm='IP' and its corresponding
> value
> I would need IP, xx.yy.106.25
>
>
> Please suggest
>


KafkaUtils.createStream not picking smallest offset

2016-08-12 Thread Diwakar Dhanuskodi

Hi,
We are  using  spark  1.6.1 and  kafka 0.9.

KafkaUtils.createStream is  showing strange behaviour. Though  
auto.offset.reset is  set  to  smallest .  Whenever we  need  to  restart  the  
stream it  is  picking up  the  latest  offset which  is not  expected.
Do  we  need  to  set  any  other  properties ?.

createDirectStream works fine  in  this  above  case.


Sent from Samsung Mobile.

Re: Spark join and large temp files

2016-08-12 Thread Gourav Sengupta
Hi Ashic,

That is a pretty 2011 way of solving the problem, what is more painful
about this way of working is that you need to load the data in to REDIS,
keep a REDIS cluster running and in case you are workign across several
clusters then may be install REDIS in all of them or hammer your driver.

Did you try using UDF's on broadcast data? The solution is pretty much the
same, except that instead of REDIS you use the broadcast variable and it
scales wonderfully across several cluster of machines.


Regards,
Gourav Sengupta

On Thu, Aug 11, 2016 at 11:23 PM, Ashic Mahtab  wrote:

> Hi Ben,
> Already tried that. The thing is that any form of shuffle on the big
> dataset (which repartition will cause) puts a node's chunk into /tmp, and
> that fill up disk. I solved the problem by storing the 1.5GB dataset in an
> embedded redis instance on the driver, and doing a straight flatmap of the
> big dataset (doing lookups in redis). This avoids shuffling, and prevents
> the /tmp fill-up issue.
>
> -Ashic.
>
> --
> Subject: Re: Spark join and large temp files
> From: bteeu...@gmail.com
> Date: Thu, 11 Aug 2016 22:24:42 +0200
> CC: user@spark.apache.org
> To: gourav.sengu...@gmail.com
>
>
> Hmm, hashing will probably send all of the records with the same key to
> the same partition / machine.
> I’d try it out, and hope that if you have a few superlarge keys bigger
> than the RAM available of one node, they spill to disk. Maybe play with
> persist() and using a different Storage Level.
>
> On Aug 11, 2016, at 9:48 PM, Gourav Sengupta 
> wrote:
>
> Hi Ben,
>
> and that will take care of skewed data?
>
> Gourav
>
> On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen  wrote:
>
> When you read both ‘a’ and ‘b', can you try repartitioning both by column
> ‘id’?
> If you .cache() and .count() to force a shuffle, it'll push the records
> that will be joined to the same executors.
>
> So;
> a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()
> a.count()
>
> b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()
> b.count()
>
> And then join..
>
>
> On Aug 8, 2016, at 8:17 PM, Ashic Mahtab  wrote:
>
> Hello,
> We have two parquet inputs of the following form:
>
> a: id:String, Name:String  (1.5TB)
> b: id:String, Number:Int  (1.3GB)
>
> We need to join these two to get (id, Number, Name). We've tried two
> approaches:
>
> a.join(b, Seq("id"), "right_outer")
>
> where a and b are dataframes. We also tried taking the rdds, mapping them
> to pair rdds with id as the key, and then joining. What we're seeing is
> that temp file usage is increasing on the join stage, and filling up our
> disks, causing the job to crash. Is there a way to join these two data sets
> without well...crashing?
>
> Note, the ids are unique, and there's a one to one mapping between the two
> datasets.
>
> Any help would be appreciated.
>
> -Ashic.
>
>
>
>
>