spark streaming write orc suddenly slow?

2017-03-27 Thread 446463...@qq.com
Hi All:
when I use sparking streaming to consume kafka data to stroe in HDFS with orc 
files format.
it's fast in the beginning,but it's slow in several hours later.
environment:
spark version: 2.1.0
kafka version 0.10.1.1
spark-streaming-kafka jar's version: spark-streaming-kafka-0-8-2.11



446463...@qq.com


Re: Why selectExpr changes schema (to include id column)?

2017-03-27 Thread Hyukjin Kwon
Thanks for your confirmation.

On 28 Mar 2017 5:02 a.m., "Jacek Laskowski"  wrote:

Hi Hyukjin,

It was a false alarm as I had a local change to `def schema` in
`Dataset` that caused the issue.

I apologize for the noise. Sorry and thanks a lot for the prompt
response. I appreciate.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Mon, Mar 27, 2017 at 2:43 PM, Hyukjin Kwon  wrote:
> I just tried to build against the current master to help check -
> https://github.com/apache/spark/commit/3fbf0a5f9297f438bc92db11f106d4
a0ae568613
>
> It seems I can't reproduce this as below:
>
>
> scala> spark.range(1).printSchema
> root
>  |-- id: long (nullable = false)
>
>
> scala> spark.range(1).selectExpr("*").printSchema
> root
>  |-- id: long (nullable = false)
>
>
> scala> spark.version
> res2: String = 2.2.0-SNAPSHOT
>
>
>
>
> 2017-03-27 17:58 GMT+09:00 Jacek Laskowski :
>>
>> Hi,
>>
>> While toying with selectExpr I've noticed that the schema changes to
>> include id column. I can't seem to explain it. Anyone?
>>
>> scala> spark.range(1).printSchema
>> root
>>  |-- value: long (nullable = true)
>>
>> scala> spark.range(1).selectExpr("*").printSchema
>> root
>>  |-- id: long (nullable = false)
>>
>> p.s. http://stackoverflow.com/q/43041975/1305344
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>


Re: Kafka failover with multiple data centers

2017-03-27 Thread nguyen duc Tuan
Hi Soumitra,
We're working on that. The Idea here is to use Kafka to get brokers'
information of the topic and use Kafka client to find coresponding offsets
on new cluster (
https://jeqo.github.io/post/2017-01-31-kafka-rewind-consumers-offset/). You
need kafka >=0.10.1.0 because it supports timestamp-based index.

2017-03-28 5:24 GMT+07:00 Soumitra Johri :

> Hi, did you guys figure it out?
>
> Thanks
> Soumitra
>
> On Sun, Mar 5, 2017 at 9:51 PM nguyen duc Tuan 
> wrote:
>
>> Hi everyone,
>> We are deploying kafka cluster for ingesting streaming data. But
>> sometimes, some of nodes on the cluster have troubles (node dies, kafka
>> daemon is killed...). However, Recovering data in Kafka can be very slow.
>> It takes serveral hours to recover from disaster. I saw a slide here
>> suggesting using multiple data centers (https://www.slideshare.net/
>> HadoopSummit/building-largescale-stream-infrastructures-across-
>> multiple-data-centers-with-apache-kafka). But I wonder, how can we
>> detect the problem and switch between datacenters in Spark Streaming? Since
>> kafka 0.10.1 support timestamp index, how can seek to right offsets?
>> Are there any opensource library out there that supports handling the
>> problem on the fly?
>> Thanks.
>>
>


Support Stored By Clause

2017-03-27 Thread Denny Lee
Per SPARK-19630, wondering if there are plans to support "STORED BY" clause
for Spark 2.x?

Thanks!


Re: Upgrade the scala code using the most updated Spark version

2017-03-27 Thread vvshvv
it yo



On Jörn Franke , Mar 28, 2017 12:11 AM wrote:Usually you define the dependencies to the Spark library as provided. You also seem to mix different Spark versions which should be avoided.The Hadoop library seems to be outdated and should also only be provided.The other dependencies you could assemble in a fat jar.On 27 Mar 2017, at 21:25, Anahita Talebi  wrote:Hi friends, I have a code which is written in Scala. The scala version 2.10.4 and Spark version 1.5.2 are used to run the code. I would like to upgrade the code to the most updated version of spark, meaning 2.1.0.Here is the build.sbt:import AssemblyKeys._assemblySettingsname := "proxcocoa"version := "0.1"scalaVersion := "2.10.4"parallelExecution in Test := false{  val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop")  libraryDependencies ++= Seq(    "org.slf4j" % "slf4j-api" % "1.7.2",    "org.slf4j" % "slf4j-log4j12" % "1.7.2",    "org.scalatest" %% "scalatest" % "1.9.1" % "test",    "org.apache.spark" % "spark-core_2.10" % "1.5.2" excludeAll(excludeHadoop),    "org.apache.spark" % "spark-mllib_2.10" % "1.5.2" excludeAll(excludeHadoop),    "org.apache.spark" % "spark-sql_2.10" % "1.5.2" excludeAll(excludeHadoop),    "org.apache.commons" % "commons-compress" % "1.7",    "commons-io" % "commons-io" % "2.4",    "org.scalanlp" % "breeze_2.10" % "0.11.2",    "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly(),    "com.github.scopt" %% "scopt" % "3.3.0"  )}{  val defaultHadoopVersion = "1.0.4"  val hadoopVersion =    scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", defaultHadoopVersion)  libraryDependencies += "org.apache.hadoop" % "hadoop-client" % hadoopVersion}libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.5.0"resolvers ++= Seq(  "Local Maven Repository" at Path.userHome.asFile.toURI.toURL + ".m2/repository",  "Typesafe" at "http://repo.typesafe.com/typesafe/releases",  "Spray" at "http://repo.spray.cc")mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>  {    case PathList("javax", "servlet", xs @ _*)   => MergeStrategy.first    case PathList(ps @ _*) if ps.last endsWith ".html"   => MergeStrategy.first    case "application.conf"  => MergeStrategy.concat    case "reference.conf"    => MergeStrategy.concat    case "log4j.properties"  => MergeStrategy.discard    case m if m.toLowerCase.endsWith("manifest.mf")  => MergeStrategy.discard    case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  => MergeStrategy.discard    case _ => MergeStrategy.first  }}test in assembly := {}---I downloaded the spark 2.1.0 and change the version of spark and scalaversion in the build.sbt. But unfortunately, I was failed to run the code. Does anybody know how I can upgrade the code to the most recent spark version by changing the build.sbt file? Or do you have any other suggestion?Thanks a lot, Anahita 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Kafka failover with multiple data centers

2017-03-27 Thread Soumitra Johri
Hi, did you guys figure it out?

Thanks
Soumitra
On Sun, Mar 5, 2017 at 9:51 PM nguyen duc Tuan  wrote:

> Hi everyone,
> We are deploying kafka cluster for ingesting streaming data. But
> sometimes, some of nodes on the cluster have troubles (node dies, kafka
> daemon is killed...). However, Recovering data in Kafka can be very slow.
> It takes serveral hours to recover from disaster. I saw a slide here
> suggesting using multiple data centers (
> https://www.slideshare.net/HadoopSummit/building-largescale-stream-infrastructures-across-multiple-data-centers-with-apache-kafka).
> But I wonder, how can we detect the problem and switch between datacenters
> in Spark Streaming? Since kafka 0.10.1 support timestamp index, how can
> seek to right offsets?
> Are there any opensource library out there that supports handling the
> problem on the fly?
> Thanks.
>


Re: Spark shuffle files

2017-03-27 Thread Mark Hamstra
When the RDD using them goes out of scope.

On Mon, Mar 27, 2017 at 3:13 PM, Ashwin Sai Shankar 
wrote:

> Thanks Mark! follow up question, do you know when shuffle files are
> usually un-referenced?
>
> On Mon, Mar 27, 2017 at 2:35 PM, Mark Hamstra 
> wrote:
>
>> Shuffle files are cleaned when they are no longer referenced. See
>> https://github.com/apache/spark/blob/master/core/src/mai
>> n/scala/org/apache/spark/ContextCleaner.scala
>>
>> On Mon, Mar 27, 2017 at 12:38 PM, Ashwin Sai Shankar <
>> ashan...@netflix.com.invalid> wrote:
>>
>>> Hi!
>>>
>>> In spark on yarn, when are shuffle files on local disk removed? (Is it
>>> when the app completes or
>>> once all the shuffle files are fetched or end of the stage?)
>>>
>>> Thanks,
>>> Ashwin
>>>
>>
>>
>


Re: Spark shuffle files

2017-03-27 Thread Ashwin Sai Shankar
Thanks Mark! follow up question, do you know when shuffle files are usually
un-referenced?

On Mon, Mar 27, 2017 at 2:35 PM, Mark Hamstra 
wrote:

> Shuffle files are cleaned when they are no longer referenced. See
> https://github.com/apache/spark/blob/master/core/src/
> main/scala/org/apache/spark/ContextCleaner.scala
>
> On Mon, Mar 27, 2017 at 12:38 PM, Ashwin Sai Shankar <
> ashan...@netflix.com.invalid> wrote:
>
>> Hi!
>>
>> In spark on yarn, when are shuffle files on local disk removed? (Is it
>> when the app completes or
>> once all the shuffle files are fetched or end of the stage?)
>>
>> Thanks,
>> Ashwin
>>
>
>


Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
Ah, I understand what you are asking now.  There is no API for specifying a
kafka specific "decoder", since Spark SQL already has a rich language for
expressing transformations.  The dataframe code I gave will parse the JSON
and materialize in a class, very similar to what objectMapper.readValue(bytes,
Tweet.class) would do.

However, there are other cases where you might need to do some domain
specific transformation that Spark SQL doesn't support natively.  In this
case you can write a UDF that does the translation. There are a couple of
different ways you can specify this, depending on whether you want to
map/flatMap or just apply the function as a UDF to a single column

.


On Mon, Mar 27, 2017 at 1:59 PM, kaniska Mandal 
wrote:

> yup, that solves the compilation issue :-)
>
> one quick question regarding specifying Decoder in kafka stream:
>
> please note that I am encoding the message as follows while sending data
> to kafka -
>
> 
>
> *String msg = objectMapper.writeValueAsString(tweetEvent);*
>
> *return msg.getBytes();*
>
> I have a corresponding 
>
> *return objectMapper.readValue(bytes, Tweet.class)*
>
>
> *>> how do I specify the Decoder in the following stream-processing flow ?*
> streams = spark
>   .readStream()
>   .format("kafka")
>   .option("kafka.bootstrap.servers", bootstrapServers)
>   .option(subscribeType, topics)
>   .load()
>   .withColumn("message", from_json(col("value").cast("string"),
> tweetSchema)) // cast the binary value to a string and parse it as json
>   .select("message.*") // unnest the json
>   .as(Encoders.bean(Tweet.class))
>
> Thanks
> Kaniska
>
> -
>
> On Mon, Mar 27, 2017 at 1:25 PM, Michael Armbrust 
> wrote:
>
>> You need to import col from org.apache.spark.sql.functions.
>>
>> On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal > > wrote:
>>
>>> Hi Michael,
>>>
>>> Can you please check if I am using correct version of spark-streaming
>>> library as specified in my pom (specified in the email) ?
>>>
>>> col("value").cast("string") - throwing an error 'cannot find symbol
>>> method col(java.lang.String)'
>>> I tried $"value" which results into similar compilation error.
>>>
>>> Thanks
>>> Kaniska
>>>
>>>
>>>
>>> On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 Sorry, I don't think that I understand the question.  Value is just a
 binary blob that we get from kafka and pass to you.  If its stored in JSON,
 I think the code I provided is a good option, but if you are using a
 different encoding you may need to write a UDF.

 On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <
 kaniska.man...@gmail.com> wrote:

> Hi Michael,
>
> Thanks much for the suggestion.
>
> I was wondering - whats the best way to deserialize the 'value' field
>
>
> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
> mich...@databricks.com> wrote:
>
>> Encoders can only map data into an object if those columns already
>> exist.  When we are reading from Kafka, we just get a binary blob and
>> you'll need to help Spark parse that first.  Assuming your data is stored
>> in JSON it should be pretty straight forward.
>>
>> streams = spark
>>   .readStream()
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>   .option(subscribeType, topics)
>>   .load()
>>   .withColumn("message", from_json(col("value").cast("string"),
>> tweetSchema)) // cast the binary value to a string and parse it as json
>>   .select("message.*") // unnest the json
>>   .as(Encoders.bean(Tweet.class)) // only required if you want to
>> use lambda functions on the data using this class
>>
>> Here is some more info on working with JSON and other
>> semi-structured formats
>> 
>> .
>>
>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska 
>> wrote:
>>
>>> Hi,
>>>
>>> Currently , encountering the following exception while working with
>>> below-mentioned code snippet :
>>>
>>> > Please suggest the correct approach for reading the stream into a
>>> sql
>>> > schema.
>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>> message -
>>> > we can not change static schema for kafka.
>>>
>>> 
>>> ---
>>>
>>> *exception*
>>>
>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>> '`location`' given input columns: [topic, timestamp, key, offset,
>>> 

Re: Upgrade the scala code using the most updated Spark version

2017-03-27 Thread Mich Talebzadeh
check these versions

function create_build_sbt_file {
BUILD_SBT_FILE=${GEN_APPSDIR}/scala/${APPLICATION}/build.sbt
[ -f ${BUILD_SBT_FILE} ] && rm -f ${BUILD_SBT_FILE}
cat >> $BUILD_SBT_FILE << !
lazy val root = (project in file(".")).
  settings(
name := "${APPLICATION}",
version := "1.0",
scalaVersion := "2.11.8",
mainClass in Compile := Some("myPackage.${APPLICATION}")
  )
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.0.0" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.0.0" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
"1.6.1" % "provided"
libraryDependencies += "com.google.code.gson" % "gson" % "2.6.2"
libraryDependencies += "org.apache.phoenix" % "phoenix-spark" %
"4.6.0-HBase-1.0"
libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.3"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.3"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.3"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.3"
// META-INF discarding
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
   {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
   }
}
!
}

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 27 March 2017 at 21:45, Jörn Franke  wrote:

> Usually you define the dependencies to the Spark library as provided. You
> also seem to mix different Spark versions which should be avoided.
> The Hadoop library seems to be outdated and should also only be provided.
>
> The other dependencies you could assemble in a fat jar.
>
> On 27 Mar 2017, at 21:25, Anahita Talebi 
> wrote:
>
> Hi friends,
>
> I have a code which is written in Scala. The scala version 2.10.4 and
> Spark version 1.5.2 are used to run the code.
>
> I would like to upgrade the code to the most updated version of spark,
> meaning 2.1.0.
>
> Here is the build.sbt:
>
> import AssemblyKeys._
>
> assemblySettings
>
> name := "proxcocoa"
>
> version := "0.1"
>
> scalaVersion := "2.10.4"
>
> parallelExecution in Test := false
>
> {
>   val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop")
>   libraryDependencies ++= Seq(
> "org.slf4j" % "slf4j-api" % "1.7.2",
> "org.slf4j" % "slf4j-log4j12" % "1.7.2",
> "org.scalatest" %% "scalatest" % "1.9.1" % "test",
> "org.apache.spark" % "spark-core_2.10" % "1.5.2"
> excludeAll(excludeHadoop),
> "org.apache.spark" % "spark-mllib_2.10" % "1.5.2"
> excludeAll(excludeHadoop),
> "org.apache.spark" % "spark-sql_2.10" % "1.5.2"
> excludeAll(excludeHadoop),
> "org.apache.commons" % "commons-compress" % "1.7",
> "commons-io" % "commons-io" % "2.4",
> "org.scalanlp" % "breeze_2.10" % "0.11.2",
> "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly(),
> "com.github.scopt" %% "scopt" % "3.3.0"
>   )
> }
>
> {
>   val defaultHadoopVersion = "1.0.4"
>   val hadoopVersion =
> scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION",
> defaultHadoopVersion)
>   libraryDependencies += "org.apache.hadoop" % "hadoop-client" %
> hadoopVersion
> }
>
> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" %
> "1.5.0"
>
> resolvers ++= Seq(
>   "Local Maven Repository" at Path.userHome.asFile.toURI.toURL +
> ".m2/repository",
>   "Typesafe" at "http://repo.typesafe.com/typesafe/releases;,
>   "Spray" at "http://repo.spray.cc;
> )
>
> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>   {
> case PathList("javax", "servlet", xs @ _*)   =>
> MergeStrategy.first
> case PathList(ps @ _*) if ps.last endsWith ".html"   =>
> MergeStrategy.first
> case "application.conf"  =>
> MergeStrategy.concat
> case "reference.conf"=>
> MergeStrategy.concat
> case "log4j.properties"  =>
> MergeStrategy.discard
> case m if m.toLowerCase.endsWith("manifest.mf")  =>
> MergeStrategy.discard
> case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  =>
> MergeStrategy.discard
> case _ => MergeStrategy.first
>   }
> }
>
> test in assembly := {}
>
> 

Re: Spark shuffle files

2017-03-27 Thread Mark Hamstra
Shuffle files are cleaned when they are no longer referenced. See
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ContextCleaner.scala

On Mon, Mar 27, 2017 at 12:38 PM, Ashwin Sai Shankar <
ashan...@netflix.com.invalid> wrote:

> Hi!
>
> In spark on yarn, when are shuffle files on local disk removed? (Is it
> when the app completes or
> once all the shuffle files are fetched or end of the stage?)
>
> Thanks,
> Ashwin
>


Re: Upgrade the scala code using the most updated Spark version

2017-03-27 Thread Jörn Franke
Usually you define the dependencies to the Spark library as provided. You also 
seem to mix different Spark versions which should be avoided.
The Hadoop library seems to be outdated and should also only be provided.

The other dependencies you could assemble in a fat jar.

> On 27 Mar 2017, at 21:25, Anahita Talebi  wrote:
> 
> Hi friends, 
> 
> I have a code which is written in Scala. The scala version 2.10.4 and Spark 
> version 1.5.2 are used to run the code. 
> 
> I would like to upgrade the code to the most updated version of spark, 
> meaning 2.1.0.
> 
> Here is the build.sbt:
> 
> import AssemblyKeys._
> 
> assemblySettings
> 
> name := "proxcocoa"
> 
> version := "0.1"
> 
> scalaVersion := "2.10.4"
> 
> parallelExecution in Test := false
> 
> {
>   val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop")
>   libraryDependencies ++= Seq(
> "org.slf4j" % "slf4j-api" % "1.7.2",
> "org.slf4j" % "slf4j-log4j12" % "1.7.2",
> "org.scalatest" %% "scalatest" % "1.9.1" % "test",
> "org.apache.spark" % "spark-core_2.10" % "1.5.2" 
> excludeAll(excludeHadoop),
> "org.apache.spark" % "spark-mllib_2.10" % "1.5.2" 
> excludeAll(excludeHadoop),
> "org.apache.spark" % "spark-sql_2.10" % "1.5.2" excludeAll(excludeHadoop),
> "org.apache.commons" % "commons-compress" % "1.7",
> "commons-io" % "commons-io" % "2.4",
> "org.scalanlp" % "breeze_2.10" % "0.11.2",
> "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly(),
> "com.github.scopt" %% "scopt" % "3.3.0"
>   )
> }
> 
> {
>   val defaultHadoopVersion = "1.0.4"
>   val hadoopVersion =
> scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", 
> defaultHadoopVersion)
>   libraryDependencies += "org.apache.hadoop" % "hadoop-client" % hadoopVersion
> }
> 
> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.5.0"
> 
> resolvers ++= Seq(
>   "Local Maven Repository" at Path.userHome.asFile.toURI.toURL + 
> ".m2/repository",
>   "Typesafe" at "http://repo.typesafe.com/typesafe/releases;,
>   "Spray" at "http://repo.spray.cc;
> )
> 
> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>   {
> case PathList("javax", "servlet", xs @ _*)   => 
> MergeStrategy.first
> case PathList(ps @ _*) if ps.last endsWith ".html"   => 
> MergeStrategy.first
> case "application.conf"  => 
> MergeStrategy.concat
> case "reference.conf"=> 
> MergeStrategy.concat
> case "log4j.properties"  => 
> MergeStrategy.discard
> case m if m.toLowerCase.endsWith("manifest.mf")  => 
> MergeStrategy.discard
> case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  => 
> MergeStrategy.discard
> case _ => MergeStrategy.first
>   }
> }
> 
> test in assembly := {}
> 
> ---
> I downloaded the spark 2.1.0 and change the version of spark and scalaversion 
> in the build.sbt. But unfortunately, I was failed to run the code. 
> 
> Does anybody know how I can upgrade the code to the most recent spark version 
> by changing the build.sbt file? 
> 
> Or do you have any other suggestion?
> 
> Thanks a lot, 
> Anahita 
> 


Re: unable to stream kafka messages

2017-03-27 Thread kaniska Mandal
yup, that solves the compilation issue :-)

one quick question regarding specifying Decoder in kafka stream:

please note that I am encoding the message as follows while sending data to
kafka -



*String msg = objectMapper.writeValueAsString(tweetEvent);*

*return msg.getBytes();*

I have a corresponding 

*return objectMapper.readValue(bytes, Tweet.class)*


*>> how do I specify the Decoder in the following stream-processing flow ?*
streams = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrapServers)
  .option(subscribeType, topics)
  .load()
  .withColumn("message", from_json(col("value").cast("string"),
tweetSchema)) // cast the binary value to a string and parse it as json
  .select("message.*") // unnest the json
  .as(Encoders.bean(Tweet.class))

Thanks
Kaniska

-

On Mon, Mar 27, 2017 at 1:25 PM, Michael Armbrust 
wrote:

> You need to import col from org.apache.spark.sql.functions.
>
> On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal 
> wrote:
>
>> Hi Michael,
>>
>> Can you please check if I am using correct version of spark-streaming
>> library as specified in my pom (specified in the email) ?
>>
>> col("value").cast("string") - throwing an error 'cannot find symbol
>> method col(java.lang.String)'
>> I tried $"value" which results into similar compilation error.
>>
>> Thanks
>> Kaniska
>>
>>
>>
>> On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> Sorry, I don't think that I understand the question.  Value is just a
>>> binary blob that we get from kafka and pass to you.  If its stored in JSON,
>>> I think the code I provided is a good option, but if you are using a
>>> different encoding you may need to write a UDF.
>>>
>>> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <
>>> kaniska.man...@gmail.com> wrote:
>>>
 Hi Michael,

 Thanks much for the suggestion.

 I was wondering - whats the best way to deserialize the 'value' field


 On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
 mich...@databricks.com> wrote:

> Encoders can only map data into an object if those columns already
> exist.  When we are reading from Kafka, we just get a binary blob and
> you'll need to help Spark parse that first.  Assuming your data is stored
> in JSON it should be pretty straight forward.
>
> streams = spark
>   .readStream()
>   .format("kafka")
>   .option("kafka.bootstrap.servers", bootstrapServers)
>   .option(subscribeType, topics)
>   .load()
>   .withColumn("message", from_json(col("value").cast("string"),
> tweetSchema)) // cast the binary value to a string and parse it as json
>   .select("message.*") // unnest the json
>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
> lambda functions on the data using this class
>
> Here is some more info on working with JSON and other semi-structured
> formats
> 
> .
>
> On Fri, Mar 24, 2017 at 10:49 AM, kaniska 
> wrote:
>
>> Hi,
>>
>> Currently , encountering the following exception while working with
>> below-mentioned code snippet :
>>
>> > Please suggest the correct approach for reading the stream into a
>> sql
>> > schema.
>> > If I add 'tweetSchema' while reading stream, it errors out with
>> message -
>> > we can not change static schema for kafka.
>>
>> 
>> ---
>>
>> *exception*
>>
>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>> '`location`' given input columns: [topic, timestamp, key, offset,
>> value,
>> timestampType, partition]*;
>> at
>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>> At.failAnalysis(package.scala:42)
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis
>> .scala:77)
>> 
>> 
>>
>> *structured streaming code snippet*
>>
>> String bootstrapServers = "localhost:9092";
>> String subscribeType = "subscribe";
>> String topics = "events";
>>
>> StructType tweetSchema = new StructType()
>> .add("tweetId", "string")
>> .add("tweetText", "string")
>> .add("location", "string")
>> .add("timestamp", "string");
>>
>>SparkSession spark = SparkSession
>>

Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
You need to import col from org.apache.spark.sql.functions.

On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal 
wrote:

> Hi Michael,
>
> Can you please check if I am using correct version of spark-streaming
> library as specified in my pom (specified in the email) ?
>
> col("value").cast("string") - throwing an error 'cannot find symbol
> method col(java.lang.String)'
> I tried $"value" which results into similar compilation error.
>
> Thanks
> Kaniska
>
>
>
> On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust  > wrote:
>
>> Sorry, I don't think that I understand the question.  Value is just a
>> binary blob that we get from kafka and pass to you.  If its stored in JSON,
>> I think the code I provided is a good option, but if you are using a
>> different encoding you may need to write a UDF.
>>
>> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal > > wrote:
>>
>>> Hi Michael,
>>>
>>> Thanks much for the suggestion.
>>>
>>> I was wondering - whats the best way to deserialize the 'value' field
>>>
>>>
>>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 Encoders can only map data into an object if those columns already
 exist.  When we are reading from Kafka, we just get a binary blob and
 you'll need to help Spark parse that first.  Assuming your data is stored
 in JSON it should be pretty straight forward.

 streams = spark
   .readStream()
   .format("kafka")
   .option("kafka.bootstrap.servers", bootstrapServers)
   .option(subscribeType, topics)
   .load()
   .withColumn("message", from_json(col("value").cast("string"),
 tweetSchema)) // cast the binary value to a string and parse it as json
   .select("message.*") // unnest the json
   .as(Encoders.bean(Tweet.class)) // only required if you want to use
 lambda functions on the data using this class

 Here is some more info on working with JSON and other semi-structured
 formats
 
 .

 On Fri, Mar 24, 2017 at 10:49 AM, kaniska 
 wrote:

> Hi,
>
> Currently , encountering the following exception while working with
> below-mentioned code snippet :
>
> > Please suggest the correct approach for reading the stream into a sql
> > schema.
> > If I add 'tweetSchema' while reading stream, it errors out with
> message -
> > we can not change static schema for kafka.
>
> 
> ---
>
> *exception*
>
> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
> '`location`' given input columns: [topic, timestamp, key, offset,
> value,
> timestampType, partition]*;
> at
> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
> At.failAnalysis(package.scala:42)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
> 
> 
>
> *structured streaming code snippet*
>
> String bootstrapServers = "localhost:9092";
> String subscribeType = "subscribe";
> String topics = "events";
>
> StructType tweetSchema = new StructType()
> .add("tweetId", "string")
> .add("tweetText", "string")
> .add("location", "string")
> .add("timestamp", "string");
>
>SparkSession spark = SparkSession
>   .builder()
>   .appName("StreamProcessor")
>   .config("spark.master", "local")
>   .getOrCreate();
>
>   Dataset streams = spark
>   .readStream()
>   .format("kafka")
>   .option("kafka.bootstrap.servers",
> bootstrapServers)
>   .option(subscribeType, topics)
>   .load()
>   .as(Encoders.bean(Tweet.class));
>
>  streams.createOrReplaceTempView("streamsData");
>
>String sql = "SELECT location,  COUNT(*) as count
> FROM streamsData
> GROUP BY location";
>Dataset countsByLocation = spark.sql(sql);
>
> StreamingQuery query =
> countsByLocation.writeStream()
>   

Re: unable to stream kafka messages

2017-03-27 Thread kaniska Mandal
Hi Michael,

Can you please check if I am using correct version of spark-streaming
library as specified in my pom (specified in the email) ?

col("value").cast("string") - throwing an error 'cannot find symbol method
col(java.lang.String)'
I tried $"value" which results into similar compilation error.

Thanks
Kaniska



On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust 
wrote:

> Sorry, I don't think that I understand the question.  Value is just a
> binary blob that we get from kafka and pass to you.  If its stored in JSON,
> I think the code I provided is a good option, but if you are using a
> different encoding you may need to write a UDF.
>
> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal 
> wrote:
>
>> Hi Michael,
>>
>> Thanks much for the suggestion.
>>
>> I was wondering - whats the best way to deserialize the 'value' field
>>
>>
>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> Encoders can only map data into an object if those columns already
>>> exist.  When we are reading from Kafka, we just get a binary blob and
>>> you'll need to help Spark parse that first.  Assuming your data is stored
>>> in JSON it should be pretty straight forward.
>>>
>>> streams = spark
>>>   .readStream()
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>>   .option(subscribeType, topics)
>>>   .load()
>>>   .withColumn("message", from_json(col("value").cast("string"),
>>> tweetSchema)) // cast the binary value to a string and parse it as json
>>>   .select("message.*") // unnest the json
>>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>>> lambda functions on the data using this class
>>>
>>> Here is some more info on working with JSON and other semi-structured
>>> formats
>>> 
>>> .
>>>
>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska 
>>> wrote:
>>>
 Hi,

 Currently , encountering the following exception while working with
 below-mentioned code snippet :

 > Please suggest the correct approach for reading the stream into a sql
 > schema.
 > If I add 'tweetSchema' while reading stream, it errors out with
 message -
 > we can not change static schema for kafka.

 
 ---

 *exception*

 Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
 '`location`' given input columns: [topic, timestamp, key, offset, value,
 timestampType, partition]*;
 at
 org.apache.spark.sql.catalyst.analysis.package$AnalysisError
 At.failAnalysis(package.scala:42)
 at
 org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
 n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
 
 

 *structured streaming code snippet*

 String bootstrapServers = "localhost:9092";
 String subscribeType = "subscribe";
 String topics = "events";

 StructType tweetSchema = new StructType()
 .add("tweetId", "string")
 .add("tweetText", "string")
 .add("location", "string")
 .add("timestamp", "string");

SparkSession spark = SparkSession
   .builder()
   .appName("StreamProcessor")
   .config("spark.master", "local")
   .getOrCreate();

   Dataset streams = spark
   .readStream()
   .format("kafka")
   .option("kafka.bootstrap.servers",
 bootstrapServers)
   .option(subscribeType, topics)
   .load()
   .as(Encoders.bean(Tweet.class));

  streams.createOrReplaceTempView("streamsData");

String sql = "SELECT location,  COUNT(*) as count
 FROM streamsData
 GROUP BY location";
Dataset countsByLocation = spark.sql(sql);

 StreamingQuery query =
 countsByLocation.writeStream()
   .outputMode("complete")
   .format("console")
   .start();

 query.awaitTermination();
 
 --

 *Tweet 

Re: Why selectExpr changes schema (to include id column)?

2017-03-27 Thread Jacek Laskowski
Hi Hyukjin,

It was a false alarm as I had a local change to `def schema` in
`Dataset` that caused the issue.

I apologize for the noise. Sorry and thanks a lot for the prompt
response. I appreciate.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Mon, Mar 27, 2017 at 2:43 PM, Hyukjin Kwon  wrote:
> I just tried to build against the current master to help check -
> https://github.com/apache/spark/commit/3fbf0a5f9297f438bc92db11f106d4a0ae568613
>
> It seems I can't reproduce this as below:
>
>
> scala> spark.range(1).printSchema
> root
>  |-- id: long (nullable = false)
>
>
> scala> spark.range(1).selectExpr("*").printSchema
> root
>  |-- id: long (nullable = false)
>
>
> scala> spark.version
> res2: String = 2.2.0-SNAPSHOT
>
>
>
>
> 2017-03-27 17:58 GMT+09:00 Jacek Laskowski :
>>
>> Hi,
>>
>> While toying with selectExpr I've noticed that the schema changes to
>> include id column. I can't seem to explain it. Anyone?
>>
>> scala> spark.range(1).printSchema
>> root
>>  |-- value: long (nullable = true)
>>
>> scala> spark.range(1).selectExpr("*").printSchema
>> root
>>  |-- id: long (nullable = false)
>>
>> p.s. http://stackoverflow.com/q/43041975/1305344
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark shuffle files

2017-03-27 Thread Ashwin Sai Shankar
Hi!

In spark on yarn, when are shuffle files on local disk removed? (Is it when
the app completes or
once all the shuffle files are fetched or end of the stage?)

Thanks,
Ashwin


Upgrade the scala code using the most updated Spark version

2017-03-27 Thread Anahita Talebi
Hi friends,

I have a code which is written in Scala. The scala version 2.10.4 and Spark
version 1.5.2 are used to run the code.

I would like to upgrade the code to the most updated version of spark,
meaning 2.1.0.

Here is the build.sbt:

import AssemblyKeys._

assemblySettings

name := "proxcocoa"

version := "0.1"

scalaVersion := "2.10.4"

parallelExecution in Test := false

{
  val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop")
  libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-api" % "1.7.2",
"org.slf4j" % "slf4j-log4j12" % "1.7.2",
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.apache.spark" % "spark-core_2.10" % "1.5.2"
excludeAll(excludeHadoop),
"org.apache.spark" % "spark-mllib_2.10" % "1.5.2"
excludeAll(excludeHadoop),
"org.apache.spark" % "spark-sql_2.10" % "1.5.2"
excludeAll(excludeHadoop),
"org.apache.commons" % "commons-compress" % "1.7",
"commons-io" % "commons-io" % "2.4",
"org.scalanlp" % "breeze_2.10" % "0.11.2",
"com.github.fommil.netlib" % "all" % "1.1.2" pomOnly(),
"com.github.scopt" %% "scopt" % "3.3.0"
  )
}

{
  val defaultHadoopVersion = "1.0.4"
  val hadoopVersion =
scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION",
defaultHadoopVersion)
  libraryDependencies += "org.apache.hadoop" % "hadoop-client" %
hadoopVersion
}

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.5.0"

resolvers ++= Seq(
  "Local Maven Repository" at Path.userHome.asFile.toURI.toURL +
".m2/repository",
  "Typesafe" at "http://repo.typesafe.com/typesafe/releases;,
  "Spray" at "http://repo.spray.cc;
)

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
case PathList("javax", "servlet", xs @ _*)   =>
MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".html"   =>
MergeStrategy.first
case "application.conf"  =>
MergeStrategy.concat
case "reference.conf"=>
MergeStrategy.concat
case "log4j.properties"  =>
MergeStrategy.discard
case m if m.toLowerCase.endsWith("manifest.mf")  =>
MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  =>
MergeStrategy.discard
case _ => MergeStrategy.first
  }
}

test in assembly := {}

---
I downloaded the spark 2.1.0 and change the version of spark and
scalaversion in the build.sbt. But unfortunately, I was failed to run the
code.

Does anybody know how I can upgrade the code to the most recent spark
version by changing the build.sbt file?

Or do you have any other suggestion?

Thanks a lot,
Anahita


Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
Sorry, I don't think that I understand the question.  Value is just a
binary blob that we get from kafka and pass to you.  If its stored in JSON,
I think the code I provided is a good option, but if you are using a
different encoding you may need to write a UDF.

On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal 
wrote:

> Hi Michael,
>
> Thanks much for the suggestion.
>
> I was wondering - whats the best way to deserialize the 'value' field
>
>
> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust  > wrote:
>
>> Encoders can only map data into an object if those columns already
>> exist.  When we are reading from Kafka, we just get a binary blob and
>> you'll need to help Spark parse that first.  Assuming your data is stored
>> in JSON it should be pretty straight forward.
>>
>> streams = spark
>>   .readStream()
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>   .option(subscribeType, topics)
>>   .load()
>>   .withColumn("message", from_json(col("value").cast("string"),
>> tweetSchema)) // cast the binary value to a string and parse it as json
>>   .select("message.*") // unnest the json
>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>> lambda functions on the data using this class
>>
>> Here is some more info on working with JSON and other semi-structured
>> formats
>> 
>> .
>>
>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska 
>> wrote:
>>
>>> Hi,
>>>
>>> Currently , encountering the following exception while working with
>>> below-mentioned code snippet :
>>>
>>> > Please suggest the correct approach for reading the stream into a sql
>>> > schema.
>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>> message -
>>> > we can not change static schema for kafka.
>>>
>>> 
>>> ---
>>>
>>> *exception*
>>>
>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>> '`location`' given input columns: [topic, timestamp, key, offset, value,
>>> timestampType, partition]*;
>>> at
>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>> At.failAnalysis(package.scala:42)
>>> at
>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>>> 
>>> 
>>>
>>> *structured streaming code snippet*
>>>
>>> String bootstrapServers = "localhost:9092";
>>> String subscribeType = "subscribe";
>>> String topics = "events";
>>>
>>> StructType tweetSchema = new StructType()
>>> .add("tweetId", "string")
>>> .add("tweetText", "string")
>>> .add("location", "string")
>>> .add("timestamp", "string");
>>>
>>>SparkSession spark = SparkSession
>>>   .builder()
>>>   .appName("StreamProcessor")
>>>   .config("spark.master", "local")
>>>   .getOrCreate();
>>>
>>>   Dataset streams = spark
>>>   .readStream()
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers",
>>> bootstrapServers)
>>>   .option(subscribeType, topics)
>>>   .load()
>>>   .as(Encoders.bean(Tweet.class));
>>>
>>>  streams.createOrReplaceTempView("streamsData");
>>>
>>>String sql = "SELECT location,  COUNT(*) as count
>>> FROM streamsData
>>> GROUP BY location";
>>>Dataset countsByLocation = spark.sql(sql);
>>>
>>> StreamingQuery query = countsByLocation.writeStream()
>>>   .outputMode("complete")
>>>   .format("console")
>>>   .start();
>>>
>>> query.awaitTermination();
>>> 
>>> --
>>>
>>> *Tweet *
>>>
>>> Tweet.java - has public constructor and getter / setter methods
>>>
>>> public class Tweet implements Serializable{
>>>
>>> private String tweetId;
>>> private String tweetText;
>>> private String location;
>>> private String timestamp;
>>>
>>> public Tweet(){
>>>
>>> }
>>> .
>>>
>>> 
>>> 
>>>
>>> *pom.xml *
>>>
>>>
>>> 
>>> 

Re: How to insert nano seconds in the TimestampType in Spark

2017-03-27 Thread Michael Armbrust
The timestamp type is only microsecond precision.  You would need to store
it on your own (as binary or limited range long or something) if you
require nanosecond precision.

On Mon, Mar 27, 2017 at 5:29 AM, Devender Yadav <
devender.ya...@impetus.co.in> wrote:

> Hi All,
>
> I am using spark version - 1.6.1
>
> I have a text table in hive having `timestamp` datatype with nanoseconds
> precision.
>
> Hive Table Schema:
>
> c_timestamp timestamp
>
> Hive Table data:
>
> hive> select * from tbl1;
> OK
> 00:00:00.1
> 12:12:12.123456789
> 23:59:59.9
>
> But as per the docs, from Spark 1.5
>
> *Timestamps are now stored at a precision of 1us, rather than 1ns*
>
>
> Sample code:
>
> SparkConf conf = new SparkConf(true).setMaster("
> yarn-cluster").setAppName("SAMPLE_APP");
> SparkContext sc = new SparkContext(conf);
> HiveContext hc = new HiveContext(sc);
> DataFrame df = hc.table("testdb.tbl1");
>
> Data is truncated to microseconds.
>
> 00:00:00
> 12:12:12.123456
> 23:59:59.99
>
>
> Is there any way to use nanoseconds here?
>
>
> Regards,
> Devender
>
>
> --
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>


Application kill from UI do not propagate exception

2017-03-27 Thread Noorul Islam K M
Hi all,

I am trying to trap UI kill event of a spark application from driver.
Some how the exception thrown is not propagated to the driver main
program. See for example using spark-shell below.

Is there a way to get hold of this event and shutdown the driver program?

Regards,
Noorul


spark@spark1:~/spark-2.1.0/sbin$ spark-shell --master
spark://10.29.83.162:7077
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
17/03/23 15:16:47 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
17/03/23 15:16:53 WARN ObjectStore: Failed to get database
global_temp, returning NoSuchObjectException
Spark context Web UI available at http://10.29.83.162:4040
Spark context available as 'sc' (master = spark://10.29.83.162:7077,
app id = app-20170323151648-0002).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
  /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 17/03/23 15:17:28 ERROR StandaloneSchedulerBackend: Application
has been killed. Reason: Master removed our application: KILLED
17/03/23 15:17:28 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster
scheduler: Master removed our application: KILLED
at
org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:459)
at
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:139)
at
org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:254)
at
org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:168)
at
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
at
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@25b8f9d2

scala>



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Application-kill-from-UI-do-not-propagate-exception-tp28539.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



This is a test mail, please ignore!

2017-03-27 Thread Noorul Islam K M
Sending plain text mail to test whether my mail appear in the list.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/This-is-a-test-mail-please-ignore-tp28538.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark-submit config via file

2017-03-27 Thread Thakrar, Jayesh
Roy - can you check if you have HADOOP_CONF_DIR and YARN_CONF_DIR set to the 
directory containing the HDFS and YARN configuration files?

From: Sandeep Nemuri 
Date: Monday, March 27, 2017 at 9:44 AM
To: Saisai Shao 
Cc: Yong Zhang , ", Roy" , user 

Subject: Re: spark-submit config via file

You should try adding your NN host and port in the URL.

On Mon, Mar 27, 2017 at 11:03 AM, Saisai Shao 
> wrote:
It's quite obvious your hdfs URL is not complete, please looks at the 
exception, your hdfs URI doesn't have host, port. Normally it should be OK if 
HDFS is your default FS.

I think the problem is you're running on HDI, in which default FS is wasb. So 
here short name without host:port will lead to error. This looks like a HDI 
specific issue, you'd better ask HDI.


Exception in thread "main" java.io.IOException: Incomplete HDFS URI, no host: 
hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz

at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:154)

at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2791)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)

at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2825)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2807)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)





On Fri, Mar 24, 2017 at 9:18 PM, Yong Zhang 
> wrote:

Of course it is possible.



You can always to set any configurations in your application using API, instead 
of pass in through the CLI.



val sparkConf = new 
SparkConf().setAppName(properties.get("appName")).set("master", 
properties.get("master")).set(xxx, properties.get("xxx"))

Your error is your environment problem.

Yong

From: , Roy >
Sent: Friday, March 24, 2017 7:38 AM
To: user
Subject: spark-submit config via file

Hi,

I am trying to deploy spark job by using spark-submit which has bunch of 
parameters like

spark-submit --class StreamingEventWriterDriver --master yarn --deploy-mode 
cluster --executor-memory 3072m --executor-cores 4 --files streaming.conf 
spark_streaming_2.11-assembly-1.0-SNAPSHOT.jar -conf "streaming.conf"

I was looking a way to put all these flags in the file to pass to spark-submit 
to make my spark-submitcommand simple like this

spark-submit --class StreamingEventWriterDriver --master yarn --deploy-mode 
cluster --properties-file properties.conf --files streaming.conf 
spark_streaming_2.11-assembly-1.0-SNAPSHOT.jar -conf "streaming.conf"

properties.conf has following contents



spark.executor.memory 3072m

spark.executor.cores 4



But I am getting following error



17/03/24 11:36:26 INFO Client: Use hdfs cache file as spark.yarn.archive for 
HDP, 
hdfsCacheFile:hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz

17/03/24 11:36:26 WARN AzureFileSystemThreadPoolExecutor: Disabling threads for 
Delete operation as thread count 0 is <= 1

17/03/24 11:36:26 INFO AzureFileSystemThreadPoolExecutor: Time taken for Delete 
operation is: 1 ms with threads: 0

17/03/24 11:36:27 INFO Client: Deleted staging directory 
wasb://a...@abc.blob.core.windows.net/user/sshuser/.sparkStaging/application_1488402758319_0492

Exception in thread "main" java.io.IOException: Incomplete HDFS URI, no host: 
hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz

at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:154)

at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2791)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)

at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2825)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2807)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)

at 
org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:364)

at 
org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:480)

at 
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:552)

at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:881)

at 
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:170)

at 

Re: spark-submit config via file

2017-03-27 Thread Sandeep Nemuri
You should try adding your NN host and port in the URL.

On Mon, Mar 27, 2017 at 11:03 AM, Saisai Shao 
wrote:

> It's quite obvious your hdfs URL is not complete, please looks at the
> exception, your hdfs URI doesn't have host, port. Normally it should be OK
> if HDFS is your default FS.
>
> I think the problem is you're running on HDI, in which default FS is wasb.
> So here short name without host:port will lead to error. This looks like a
> HDI specific issue, you'd better ask HDI.
>
> Exception in thread "main" java.io.IOException: Incomplete HDFS URI, no
> host: hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz
>
> at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(Dist
> ributedFileSystem.java:154)
>
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.
> java:2791)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
>
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem
> .java:2825)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2807)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)
>
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
>
>
>
>
> On Fri, Mar 24, 2017 at 9:18 PM, Yong Zhang  wrote:
>
>> Of course it is possible.
>>
>>
>> You can always to set any configurations in your application using API,
>> instead of pass in through the CLI.
>>
>>
>> val sparkConf = new SparkConf().setAppName(properties.get("appName")
>> ).set("master", properties.get("master")).set(xxx, properties.get("xxx"))
>>
>> Your error is your environment problem.
>>
>> Yong
>> --
>> *From:* , Roy 
>> *Sent:* Friday, March 24, 2017 7:38 AM
>> *To:* user
>> *Subject:* spark-submit config via file
>>
>> Hi,
>>
>> I am trying to deploy spark job by using spark-submit which has bunch of
>> parameters like
>>
>> spark-submit --class StreamingEventWriterDriver --master yarn
>> --deploy-mode cluster --executor-memory 3072m --executor-cores 4 --files
>> streaming.conf spark_streaming_2.11-assembly-1.0-SNAPSHOT.jar -conf
>> "streaming.conf"
>>
>> I was looking a way to put all these flags in the file to pass to
>> spark-submit to make my spark-submitcommand simple like this
>>
>> spark-submit --class StreamingEventWriterDriver --master yarn
>> --deploy-mode cluster --properties-file properties.conf --files
>> streaming.conf spark_streaming_2.11-assembly-1.0-SNAPSHOT.jar -conf
>> "streaming.conf"
>>
>> properties.conf has following contents
>>
>>
>> spark.executor.memory 3072m
>>
>> spark.executor.cores 4
>>
>>
>> But I am getting following error
>>
>>
>> 17/03/24 11:36:26 INFO Client: Use hdfs cache file as spark.yarn.archive
>> for HDP, hdfsCacheFile:hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-
>> hdp-yarn-archive.tar.gz
>>
>> 17/03/24 11:36:26 WARN AzureFileSystemThreadPoolExecutor: Disabling
>> threads for Delete operation as thread count 0 is <= 1
>>
>> 17/03/24 11:36:26 INFO AzureFileSystemThreadPoolExecutor: Time taken for
>> Delete operation is: 1 ms with threads: 0
>>
>> 17/03/24 11:36:27 INFO Client: Deleted staging directory wasb://
>> a...@abc.blob.core.windows.net/user/sshuser/.sparkStag
>> ing/application_1488402758319_0492
>>
>> Exception in thread "main" java.io.IOException: Incomplete HDFS URI, no
>> host: hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz
>>
>> at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(Dist
>> ributedFileSystem.java:154)
>>
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.
>> java:2791)
>>
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem
>> .java:2825)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:
>> 2807)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)
>>
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
>>
>> at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.
>> scala:364)
>>
>> at org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$
>> yarn$Client$$distribute$1(Client.scala:480)
>>
>> at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Cl
>> ient.scala:552)
>>
>> at org.apache.spark.deploy.yarn.Client.createContainerLaunchCon
>> text(Client.scala:881)
>>
>> at org.apache.spark.deploy.yarn.Client.submitApplication(Client
>> .scala:170)
>>
>> at org.apache.spark.deploy.yarn.Client.run(Client.scala:1218)
>>
>> at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1277)
>>
>> at org.apache.spark.deploy.yarn.Client.main(Client.scala)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>>

Re: Why selectExpr changes schema (to include id column)?

2017-03-27 Thread Hyukjin Kwon
I just tried to build against the current master to help check -
https://github.com/apache/spark/commit/3fbf0a5f9297f438bc92db11f106d4a0ae568613

It seems I can't reproduce this as below:


scala> spark.range(1).printSchema
root
 |-- id: long (nullable = false)


scala> spark.range(1).selectExpr("*").printSchema
root
 |-- id: long (nullable = false)


scala> spark.version
res2: String = 2.2.0-SNAPSHOT




2017-03-27 17:58 GMT+09:00 Jacek Laskowski :

> Hi,
>
> While toying with selectExpr I've noticed that the schema changes to
> include id column. I can't seem to explain it. Anyone?
>
> scala> spark.range(1).printSchema
> root
>  |-- value: long (nullable = true)
>
> scala> spark.range(1).selectExpr("*").printSchema
> root
>  |-- id: long (nullable = false)
>
> p.s. http://stackoverflow.com/q/43041975/1305344
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


How to insert nano seconds in the TimestampType in Spark

2017-03-27 Thread Devender Yadav
Hi All,

I am using spark version - 1.6.1

I have a text table in hive having `timestamp` datatype with nanoseconds 
precision.

Hive Table Schema:

c_timestamp timestamp

Hive Table data:

hive> select * from tbl1;
OK
00:00:00.1
12:12:12.123456789
23:59:59.9

But as per the docs, from Spark 1.5

Timestamps are now stored at a precision of 1us, rather than 1ns

Sample code:

SparkConf conf = new 
SparkConf(true).setMaster("yarn-cluster").setAppName("SAMPLE_APP");
SparkContext sc = new SparkContext(conf);
HiveContext hc = new HiveContext(sc);
DataFrame df = hc.table("testdb.tbl1");

Data is truncated to microseconds.

00:00:00
12:12:12.123456
23:59:59.99


Is there any way to use nanoseconds here?


Regards,
Devender









NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


apache-spark: Converting List of Rows into Dataset Java

2017-03-27 Thread Karin Valisova
Hello!

I am running Spark on Java and bumped into a problem I can't solve or find
anything helpful among answered questions, so I would really appreciate
your help.

I am running some calculations, creating rows for each result:

List results = new LinkedList();

for(something){
results.add(RowFactory.create( someStringVariable, someIntegerVariable ));
 }

Now I ended up with a list of rows I need to turn into dataframe to perform
some spark sql operations on them, like groupings and sorting. Would like
to keep the dataTypes.

I tried:

Dataset toShow = spark.createDataFrame(results, Row.class);

but it throws nullpointer. (spark being SparkSession) Is my logic wrong
there somewhere, should this operation be possible, resulting in what I
want?
Or do I have to create a custom class which extends serializable and create
a list of those objects rather than Rows? Will I be able to perform SQL
queries on dataset consisting of custom class objects rather than rows?

I'm sorry if this is a duplicate question.
Thank you for your help!

Karin


Why selectExpr changes schema (to include id column)?

2017-03-27 Thread Jacek Laskowski
Hi,

While toying with selectExpr I've noticed that the schema changes to
include id column. I can't seem to explain it. Anyone?

scala> spark.range(1).printSchema
root
 |-- value: long (nullable = true)

scala> spark.range(1).selectExpr("*").printSchema
root
 |-- id: long (nullable = false)

p.s. http://stackoverflow.com/q/43041975/1305344

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark-submit config via file

2017-03-27 Thread Saisai Shao
It's quite obvious your hdfs URL is not complete, please looks at the
exception, your hdfs URI doesn't have host, port. Normally it should be OK
if HDFS is your default FS.

I think the problem is you're running on HDI, in which default FS is wasb.
So here short name without host:port will lead to error. This looks like a
HDI specific issue, you'd better ask HDI.

Exception in thread "main" java.io.IOException: Incomplete HDFS URI, no
host: hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz

at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(
DistributedFileSystem.java:154)

at org.apache.hadoop.fs.FileSystem.createFileSystem(
FileSystem.java:2791)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(
FileSystem.java:2825)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2807)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)




On Fri, Mar 24, 2017 at 9:18 PM, Yong Zhang  wrote:

> Of course it is possible.
>
>
> You can always to set any configurations in your application using API,
> instead of pass in through the CLI.
>
>
> val sparkConf = new SparkConf().setAppName(properties.get("appName")).set(
> "master", properties.get("master")).set(xxx, properties.get("xxx"))
>
> Your error is your environment problem.
>
> Yong
> --
> *From:* , Roy 
> *Sent:* Friday, March 24, 2017 7:38 AM
> *To:* user
> *Subject:* spark-submit config via file
>
> Hi,
>
> I am trying to deploy spark job by using spark-submit which has bunch of
> parameters like
>
> spark-submit --class StreamingEventWriterDriver --master yarn
> --deploy-mode cluster --executor-memory 3072m --executor-cores 4 --files
> streaming.conf spark_streaming_2.11-assembly-1.0-SNAPSHOT.jar -conf
> "streaming.conf"
>
> I was looking a way to put all these flags in the file to pass to
> spark-submit to make my spark-submitcommand simple like this
>
> spark-submit --class StreamingEventWriterDriver --master yarn
> --deploy-mode cluster --properties-file properties.conf --files
> streaming.conf spark_streaming_2.11-assembly-1.0-SNAPSHOT.jar -conf
> "streaming.conf"
>
> properties.conf has following contents
>
>
> spark.executor.memory 3072m
>
> spark.executor.cores 4
>
>
> But I am getting following error
>
>
> 17/03/24 11:36:26 INFO Client: Use hdfs cache file as spark.yarn.archive
> for HDP, hdfsCacheFile:hdfs:///hdp/apps/2.6.0.0-403/spark2/
> spark2-hdp-yarn-archive.tar.gz
>
> 17/03/24 11:36:26 WARN AzureFileSystemThreadPoolExecutor: Disabling
> threads for Delete operation as thread count 0 is <= 1
>
> 17/03/24 11:36:26 INFO AzureFileSystemThreadPoolExecutor: Time taken for
> Delete operation is: 1 ms with threads: 0
>
> 17/03/24 11:36:27 INFO Client: Deleted staging directory wasb://
> a...@abc.blob.core.windows.net/user/sshuser/.sparkStaging/application_
> 1488402758319_0492
>
> Exception in thread "main" java.io.IOException: Incomplete HDFS URI, no
> host: hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz
>
> at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(
> DistributedFileSystem.java:154)
>
> at org.apache.hadoop.fs.FileSystem.createFileSystem(
> FileSystem.java:2791)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
>
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(
> FileSystem.java:2825)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2807)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)
>
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
>
> at org.apache.spark.deploy.yarn.Client.copyFileToRemote(
> Client.scala:364)
>
> at org.apache.spark.deploy.yarn.Client.org$apache$spark$
> deploy$yarn$Client$$distribute$1(Client.scala:480)
>
> at org.apache.spark.deploy.yarn.Client.prepareLocalResources(
> Client.scala:552)
>
> at org.apache.spark.deploy.yarn.Client.
> createContainerLaunchContext(Client.scala:881)
>
> at org.apache.spark.deploy.yarn.Client.submitApplication(
> Client.scala:170)
>
> at org.apache.spark.deploy.yarn.Client.run(Client.scala:1218)
>
> at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1277)
>
> at org.apache.spark.deploy.yarn.Client.main(Client.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:745)
>
>