Accumulator param not combining values

2016-04-15 Thread Connie Chen
Hi all,

I wrote an AccumulatorParam but in my job it does not seem to be adding the
values. When I tried with an int accumulator in my job the value was added
to.

object MapAccumulatorParam extends AccumulatorParam[Map[Long, Int]]{
>   def zero(initialValue: Map[Long, Int] = Map.empty): Map[Long, Int] = {
> Map.empty
>   }
>   override def addInPlace(m1: Map[Long, Int], m2: Map[Long, Int]):
> Map[Long, Int] = {
> val res = m1.map {
>   case (k, v) => {
> k -> (v + m2.getOrElse(k, 0))
>   }
> } ++ m2
> println(res)
> res
>   }
>   override def addAccumulator(t1: Map[Long, Int], t2: Map[Long, Int]):
> Map[Long, Int] = {
> val res = t1.map {
>   case (k, v) => {
> k -> (v + t2.getOrElse(k, 0))
>   }
> } ++ t2
> println(res)
> res
>   }
> }


To use it I am doing something like this:

val acc1 = sparkEnv.sc.accumulator(Map.empty[Long,
> Int])(MapAccumulatorParam)
> val acc2 = sparkEnv.sc.accumulator(Map.empty[Long,
> Int])(MapAccumulatorParam)
> new SparkJob(sparkEnv, parsedArgs, unpricedCallsAcc, acc1, acc2).runJob()
> match { ...'


Where sparkEnv is a wrapper for both SQLContext and SparkContext. Is there
any reason my values might not be getting added together? Did I initialize
the accumulators in the wrong place (it is where my spark contexts get
created) When I step through the function I see that I get back a Map(1 ->
1) when I do

acc1.add(Map(1 -> 1)


but it never starts the add with a nonempty Map. Any ideas? Thanks in
advance.


Re: Shuffle service fails to register driver - Spark - Mesos

2016-04-15 Thread Jo Voordeckers
Never mind, just figured out my problem,

I was running: *deploy.ExternalShuffleService* instead of
*deploy.mesos.MesosExternalShuffleService*

- Jo Voordeckers


On Fri, Apr 15, 2016 at 2:29 PM, Jo Voordeckers 
wrote:

> Forgot to mention we're running Spark (Streaming) 1.5.1
>
> - Jo Voordeckers
>
>
> On Fri, Apr 15, 2016 at 12:21 PM, Jo Voordeckers  > wrote:
>
>> Hi all,
>>
>> I've got mesos in coarse grained mode with dyn alloc, shuffle service
>> enabled and am running the shuffle service on every mesos slave.
>>
>> I'm assuming I misconfigured something on the scheduler service, any
>> ideas?
>>
>> On my driver is see a few of these, I guess it's one for every executor :
>>
>> 19:12:29 WARN [shuffle-client-1] MesosExternalShuffleClient - Unable to
>>> register app 78a944c9-3a89-4334-bca3-7108aadb1798- with external
>>> shuffle service. Please manually remove shuffle data after driver exit.
>>> Error: java.lang.RuntimeException: java.lang.UnsupportedOperationException:
>>> Unexpected message:
>>> org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver@c399c59a
>>> at
>>> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:92)
>>> at
>>> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:68)
>>> at
>>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>>> at
>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>>> at
>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>>> at
>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>>> at
>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>>
>> [...]
>>
>> In the scheduler service I see logs like this on every box:
>>
>> log4j:WARN No appenders could be found for logger
>>> (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
>>> log4j:WARN Please initialize the log4j system properly.
>>> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
>>> for more info.
>>> Using Spark's repl log4j profile:
>>> org/apache/spark/log4j-defaults-repl.properties
>>> To adjust logging level use sc.setLogLevel("INFO")
>>> 16/04/14 19:12:29 ERROR TransportRequestHandler: Error while invoking
>>> RpcHandler#receive() on RPC id 7280403447531815366
>>> java.lang.UnsupportedOperationException: Unexpected message:
>>> org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver@c399c59a
>>> at
>>> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:92)
>>> at
>>> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:68)
>>> at
>>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>>> at
>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>>> at
>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>>> at
>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>>> at
>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>>
>> [...]
>>
>> Thanks!
>>
>> - Jo Voordeckers
>>
>>
>


Re: can spark-csv package accept strings instead of files?

2016-04-15 Thread Benjamin Kim
Thanks!

I got this to work.

val csvRdd = sc.parallelize(data.split("\n"))
val df = new 
com.databricks.spark.csv.CsvParser().withUseHeader(true).withInferSchema(true).csvRdd(sqlContext,
 csvRdd)

> On Apr 15, 2016, at 1:14 PM, Hyukjin Kwon  wrote:
> 
> Hi,
> 
> Would you try this codes below?
> 
> val csvRDD = ...your processimg for csv rdd..
> val df = new CsvParser().csvRdd(sqlContext, csvRDD, useHeader = true)
> 
> Thanks!
> 
> On 16 Apr 2016 1:35 a.m., "Benjamin Kim"  > wrote:
> Hi Hyukjin,
> 
> I saw that. I don’t know how to use it. I’m still learning Scala on my own. 
> Can you help me to start?
> 
> Thanks,
> Ben
> 
>> On Apr 15, 2016, at 8:02 AM, Hyukjin Kwon > > wrote:
>> 
>> I hope it was not too late :).
>> 
>> It is possible.
>> 
>> Please check csvRdd api here, 
>> https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150
>>  
>> .
>> 
>> Thanks!
>> 
>> On 2 Apr 2016 2:47 a.m., "Benjamin Kim" > > wrote:
>> Does anyone know if this is possible? I have an RDD loaded with rows of CSV 
>> data strings. Each string representing the header row and multiple rows of 
>> data along with delimiters. I would like to feed each thru a CSV parser to 
>> convert the data into a dataframe and, ultimately, UPSERT a Hive/HBase table 
>> with this data.
>> 
>> Please let me know if you have any ideas.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> 
>> 
> 



Re: Shuffle service fails to register driver - Spark - Mesos

2016-04-15 Thread Jo Voordeckers
Forgot to mention we're running Spark (Streaming) 1.5.1

- Jo Voordeckers


On Fri, Apr 15, 2016 at 12:21 PM, Jo Voordeckers 
wrote:

> Hi all,
>
> I've got mesos in coarse grained mode with dyn alloc, shuffle service
> enabled and am running the shuffle service on every mesos slave.
>
> I'm assuming I misconfigured something on the scheduler service, any ideas?
>
> On my driver is see a few of these, I guess it's one for every executor :
>
> 19:12:29 WARN [shuffle-client-1] MesosExternalShuffleClient - Unable to
>> register app 78a944c9-3a89-4334-bca3-7108aadb1798- with external
>> shuffle service. Please manually remove shuffle data after driver exit.
>> Error: java.lang.RuntimeException: java.lang.UnsupportedOperationException:
>> Unexpected message:
>> org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver@c399c59a
>> at
>> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:92)
>> at
>> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:68)
>> at
>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>> at
>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>> at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>
> [...]
>
> In the scheduler service I see logs like this on every box:
>
> log4j:WARN No appenders could be found for logger
>> (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
>> log4j:WARN Please initialize the log4j system properly.
>> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
>> more info.
>> Using Spark's repl log4j profile:
>> org/apache/spark/log4j-defaults-repl.properties
>> To adjust logging level use sc.setLogLevel("INFO")
>> 16/04/14 19:12:29 ERROR TransportRequestHandler: Error while invoking
>> RpcHandler#receive() on RPC id 7280403447531815366
>> java.lang.UnsupportedOperationException: Unexpected message:
>> org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver@c399c59a
>> at
>> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:92)
>> at
>> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:68)
>> at
>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>> at
>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>> at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>
> [...]
>
> Thanks!
>
> - Jo Voordeckers
>
>


Re: can spark-csv package accept strings instead of files?

2016-04-15 Thread Benjamin Kim
Is this right?


import com.databricks.spark.csv

val csvRdd = data.flatMap(x => x.split("\n"))
val df = new CsvParser().csvRdd(sqlContext, csvRdd, useHeader = true)

Thanks,
Ben


> On Apr 15, 2016, at 1:14 PM, Hyukjin Kwon  wrote:
> 
> Hi,
> 
> Would you try this codes below?
> 
> val csvRDD = ...your processimg for csv rdd..
> val df = new CsvParser().csvRdd(sqlContext, csvRDD, useHeader = true)
> 
> Thanks!
> 
> On 16 Apr 2016 1:35 a.m., "Benjamin Kim"  > wrote:
> Hi Hyukjin,
> 
> I saw that. I don’t know how to use it. I’m still learning Scala on my own. 
> Can you help me to start?
> 
> Thanks,
> Ben
> 
>> On Apr 15, 2016, at 8:02 AM, Hyukjin Kwon > > wrote:
>> 
>> I hope it was not too late :).
>> 
>> It is possible.
>> 
>> Please check csvRdd api here, 
>> https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150
>>  
>> .
>> 
>> Thanks!
>> 
>> On 2 Apr 2016 2:47 a.m., "Benjamin Kim" > > wrote:
>> Does anyone know if this is possible? I have an RDD loaded with rows of CSV 
>> data strings. Each string representing the header row and multiple rows of 
>> data along with delimiters. I would like to feed each thru a CSV parser to 
>> convert the data into a dataframe and, ultimately, UPSERT a Hive/HBase table 
>> with this data.
>> 
>> Please let me know if you have any ideas.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> 
>> 
> 



Re: can spark-csv package accept strings instead of files?

2016-04-15 Thread Hyukjin Kwon
Hi,

Would you try this codes below?

val csvRDD = ...your processimg for csv rdd..
val df = new CsvParser().csvRdd(sqlContext, csvRDD, useHeader = true)

Thanks!
On 16 Apr 2016 1:35 a.m., "Benjamin Kim"  wrote:

> Hi Hyukjin,
>
> I saw that. I don’t know how to use it. I’m still learning Scala on my
> own. Can you help me to start?
>
> Thanks,
> Ben
>
> On Apr 15, 2016, at 8:02 AM, Hyukjin Kwon  wrote:
>
> I hope it was not too late :).
>
> It is possible.
>
> Please check csvRdd api here,
> https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150
> .
>
> Thanks!
> On 2 Apr 2016 2:47 a.m., "Benjamin Kim"  wrote:
>
>> Does anyone know if this is possible? I have an RDD loaded with rows of
>> CSV data strings. Each string representing the header row and multiple rows
>> of data along with delimiters. I would like to feed each thru a CSV parser
>> to convert the data into a dataframe and, ultimately, UPSERT a Hive/HBase
>> table with this data.
>>
>> Please let me know if you have any ideas.
>>
>> Thanks,
>> Ben
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: alter table add columns aternatives or hive refresh

2016-04-15 Thread Mich Talebzadeh
looks plausible. Glad it helped

Personally I prefer ORC tables as they are arguably a better fit for
columnar tables. Others may differ on this :)

Cheers

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 15 April 2016 at 20:45, Maurin Lenglart  wrote:

> Hi,
>
> Following your answer I  was able to make it work.
> FIY:
> Basically the solution is to manually create the table in hive using a sql
> “Create table” command.
> When doing  a saveAsTable, hive meta-store don’t get the info of the df.
> So now my flow is :
>
>- Create a dataframe
>- if it is the first time I see the table, I generate a CREATE TABLE
>using the DF.schema.fields.
>- If it is not:
>   - I do a diff of my df schema and myTable schema
>   - I do a sql "Alter table add columns” for the table
>   - Use a df.withColumn for each column that are missing in the df
>- Then I use df.insertInto myTable
>
> I also migrated for parquet to ORC, not sure if this have an impact or not.
>
> Thanks you for our help.
>
> From: Mich Talebzadeh 
> Date: Sunday, April 10, 2016 at 11:54 PM
> To: maurin lenglart 
> Cc: "user @spark" 
> Subject: Re: alter table add columns aternatives or hive refresh
>
> This should work. Make sure that you use HiveContext.sql and sqlContext
> correctly
>
> This is an example in Spark, reading a CSV file, doing some manipulation,
> creating a temp table, saving data as ORC file, adding another column and
> inserting values to table in Hive with default values for new rows
>
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.functions._
> //
>   val conf = new SparkConf().
>setAppName("ImportCSV").
>setMaster("local[12]").
>set("spark.driver.allowMultipleContexts", "true").
>set("spark.hadoop.validateOutputSpecs", "false")
>   val sc = new SparkContext(conf)
>
>
> *// Create sqlContext based on HiveContext   val sqlContext = new
> HiveContext(sc) *  import sqlContext.implicits._
>
> *val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) *  //
>   // Get a DF first based on Databricks CSV libraries
>   //
>   val df =
> HiveContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg/table2")
>   //
>   // Next filter out empty rows (last colum has to be > "" and get rid of
> "?" special character. Also get rid of "," in money fields
>   // Example csv cell £2,500.00 --> need to transform to plain 2500.00
>   //
>   val a = df.
>   filter(col("Total") > "").
>   map(x => (x.getString(0),x.getString(1),
> x.getString(2).substring(1).replace(",", "").toDouble,
> x.getString(3).substring(1).replace(",", "").toDouble,
> x.getString(4).substring(1).replace(",", "").toDouble))
>//
>// convert this RDD to DF and create a Spark temporary table
>//
>
> *a.toDF.registerTempTable("tmp") *  //
>   // Need to create and populate target ORC table t3 in database test in
> Hive
>   //
>   HiveContext.sql("use test")
>   HiveContext.sql("DROP TABLE IF EXISTS test.t3")
>   var sqltext : String = ""
>   sqltext = """
>   CREATE TABLE test.t3 (
>INVOICENUMBER  String
>   ,PAYMENTDATEString
>   ,NETDOUBLE
>   ,VATDOUBLE
>   ,TOTAL  DOUBLE
>   )
>   COMMENT 'from csv file from excel sheet'
>   STORED AS ORC
>   TBLPROPERTIES ( "orc.compress"="ZLIB" )
>   """
>   HiveContext.sql(sqltext)
>   // Note you can only see Spark temporary table in sqlContext NOT
> HiveContext
>   val results = sqlContext.sql("SELECT * FROM tmp")
>   // clean up the file in HDFS directory first if exists
>   val hadoopConf = new org.apache.hadoop.conf.Configuration()
>   val hdfs = org.apache.hadoop.fs.FileSystem.get(new
> java.net.URI("hdfs://rhes564:9000"), hadoopConf)
>   val output = "hdfs://rhes564:9000/user/hive/warehouse/test.db/t3"   //
> The path for Hive table just created
>   try { hdfs.delete(new org.apache.hadoop.fs.Path(output), true) } catch {
> case _ : Throwable => { } }
>
>
> *  results.write.format("orc").save(output) *//
>
> * sqlContext.sql("ALTER TABLE test.t3 ADD COLUMNS (new_col VARCHAR(30))") *
> sqlContext.sql("INSERT INTO test.t3 SELECT *, 'London' FROM tmp")
>   HiveContext.sql("SELECT * FROM test.t3 ORDER BY
> 1").collect.foreach(println)
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 11

Re: alter table add columns aternatives or hive refresh

2016-04-15 Thread Maurin Lenglart
Hi,

Following your answer I  was able to make it work.
FIY:
Basically the solution is to manually create the table in hive using a sql 
“Create table” command.
When doing  a saveAsTable, hive meta-store don’t get the info of the df.
So now my flow is :

  *   Create a dataframe
  *   if it is the first time I see the table, I generate a CREATE TABLE using 
the DF.schema.fields.
  *   If it is not:
 *   I do a diff of my df schema and myTable schema
 *   I do a sql "Alter table add columns” for the table
 *   Use a df.withColumn for each column that are missing in the df
  *   Then I use df.insertInto myTable

I also migrated for parquet to ORC, not sure if this have an impact or not.

Thanks you for our help.

From: Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>
Date: Sunday, April 10, 2016 at 11:54 PM
To: maurin lenglart mailto:mau...@cuberonlabs.com>>
Cc: "user @spark" mailto:user@spark.apache.org>>
Subject: Re: alter table add columns aternatives or hive refresh

This should work. Make sure that you use HiveContext.sql and sqlContext 
correctly

This is an example in Spark, reading a CSV file, doing some manipulation, 
creating a temp table, saving data as ORC file, adding another column and 
inserting values to table in Hive with default values for new rows

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
//
  val conf = new SparkConf().
   setAppName("ImportCSV").
   setMaster("local[12]").
   set("spark.driver.allowMultipleContexts", "true").
   set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  //
  // Get a DF first based on Databricks CSV libraries
  //
  val df = 
HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")
  //
  // Next filter out empty rows (last colum has to be > "" and get rid of "?" 
special character. Also get rid of "," in money fields
  // Example csv cell £2,500.00 --> need to transform to plain 2500.00
  //
  val a = df.
  filter(col("Total") > "").
  map(x => (x.getString(0),x.getString(1), 
x.getString(2).substring(1).replace(",", "").toDouble, 
x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))
   //
   // convert this RDD to DF and create a Spark temporary table
   //
   a.toDF.registerTempTable("tmp")
  //
  // Need to create and populate target ORC table t3 in database test in Hive
  //
  HiveContext.sql("use test")
  HiveContext.sql("DROP TABLE IF EXISTS test.t3")
  var sqltext : String = ""
  sqltext = """
  CREATE TABLE test.t3 (
   INVOICENUMBER  String
  ,PAYMENTDATEString
  ,NETDOUBLE
  ,VATDOUBLE
  ,TOTAL  DOUBLE
  )
  COMMENT 'from csv file from excel sheet'
  STORED AS ORC
  TBLPROPERTIES ( "orc.compress"="ZLIB" )
  """
  HiveContext.sql(sqltext)
  // Note you can only see Spark temporary table in sqlContext NOT HiveContext
  val results = sqlContext.sql("SELECT * FROM tmp")
  // clean up the file in HDFS directory first if exists
  val hadoopConf = new org.apache.hadoop.conf.Configuration()
  val hdfs = org.apache.hadoop.fs.FileSystem.get(new 
java.net.URI("hdfs://rhes564:9000"), hadoopConf)
  val output = "hdfs://rhes564:9000/user/hive/warehouse/test.db/t3"   // The 
path for Hive table just created
  try { hdfs.delete(new org.apache.hadoop.fs.Path(output), true) } catch { case 
_ : Throwable => { } }

  results.write.format("orc").save(output)
//
  sqlContext.sql("ALTER TABLE test.t3 ADD COLUMNS (new_col VARCHAR(30))")
  sqlContext.sql("INSERT INTO test.t3 SELECT *, 'London' FROM tmp")
  HiveContext.sql("SELECT * FROM test.t3 ORDER BY 1").collect.foreach(println)

HTH



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 11 April 2016 at 01:36, Maurin Lenglart 
mailto:mau...@cuberonlabs.com>> wrote:
Your solution works in hive, but not in spark, even if I use hive context.
I tried to create a temp table and then this query:
 - sqlContext.sql("insert into table myTable select * from myTable_temp”)
But I still get the same error.

thanks

From: Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>
Date: Sunday, April 10, 2016 at 12:25 PM
To: "user @spark" mailto:user@spark.apache.org>>

Subject: Re: alter table add columns aternatives or hive refresh

Hi,

I am confining myself to Hive tables. As I stated it before I have not tried it 
in Sp

Re: How many disks for spark_local_dirs?

2016-04-15 Thread Jan Rock
Hi, 

is it physical server or AWS/Azure? What are the executed parameters for 
spark-shell command? Hadoop distro/version and Spark version?

Kind Regards,
Jan


> On 15 Apr 2016, at 16:15, luca_guerra  wrote:
> 
> Hi,
> I'm looking for a solution to improve my Spark cluster performances, I have
> read from http://spark.apache.org/docs/latest/hardware-provisioning.html:
> "We recommend having 4-8 disks per node", I have tried both with one and two
> disks but I have seen that with 2 disks the execution time is doubled. Any
> explanations about this?
> 
> This is my configuration:
> 1 machine with 140 GB RAM 2 disks and 32 CPU (I know that is an unusual
> configuration) and on this I have a standalone Spark cluster with 1 Worker.
> 
> Thank you very much for the help.
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-many-disks-for-spark-local-dirs-tp26790.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Shuffle service fails to register driver - Spark - Mesos

2016-04-15 Thread Jo Voordeckers
Hi all,

I've got mesos in coarse grained mode with dyn alloc, shuffle service
enabled and am running the shuffle service on every mesos slave.

I'm assuming I misconfigured something on the scheduler service, any ideas?

On my driver is see a few of these, I guess it's one for every executor :

19:12:29 WARN [shuffle-client-1] MesosExternalShuffleClient - Unable to
> register app 78a944c9-3a89-4334-bca3-7108aadb1798- with external
> shuffle service. Please manually remove shuffle data after driver exit.
> Error: java.lang.RuntimeException: java.lang.UnsupportedOperationException:
> Unexpected message:
> org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver@c399c59a
> at
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:92)
> at
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:68)
> at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
> at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>
[...]

In the scheduler service I see logs like this on every box:

log4j:WARN No appenders could be found for logger
> (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
> Using Spark's repl log4j profile:
> org/apache/spark/log4j-defaults-repl.properties
> To adjust logging level use sc.setLogLevel("INFO")
> 16/04/14 19:12:29 ERROR TransportRequestHandler: Error while invoking
> RpcHandler#receive() on RPC id 7280403447531815366
> java.lang.UnsupportedOperationException: Unexpected message:
> org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver@c399c59a
> at
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:92)
> at
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:68)
> at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
> at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>
[...]

Thanks!

- Jo Voordeckers


Spark Standalone with SPARK_CLASSPATH in spark-env.sh and "spark.driver.userClassPathFirst"

2016-04-15 Thread Yong Zhang



Hi, 
I found out one problem of using "spark.driver.userClassPathFirst" and 
SPARK_CLASSPATH in spark-env.sh on Standalone environment, and want to confirm 
this in fact has no good solution.
We are running Spark 1.5.2 in standalone mode on a cluster. Since the cluster 
doesn't have the direct internet access, so we always add additional common 
jars in the SPARK_CLASSPATH of spark-env.sh file, so they are available in the 
Spark client and executor by default, like "spark-avro, 
spark-cassandra-connector, commons-csv, spark-csv" etc.
We pick these jars carefully, so they all works fine with Spark 1.5.2 version, 
without any issue.
But the problem comes when any user want to use Spark shell with their own 
jars, and these jars contains compatible issue with above jars, we got a 
problem on the driver side.
The user will start the spark shell in the following way as I suggested:
/opt/spark/bin/spark-shell . --conf 
spark.driver.userClassPathFirst=true --conf 
spark.executor.userClassPathFirst=true
But this still will get "java.lang.NoSuchMethodError" error, which I understand 
the reason due to version mismatch.
What I don't understand is on the driver end, if I check the driver JVM command:
 ps -ef | grep spark
I can clearly see the processing of the spark shell as this way:
/opt/java8/bin/java -cp 
all_jars_specified_in_SPARK_CLASSPATH:all_jars_under_opt_spark_lib --conf 
spark.executor.userClassPathFirst=true --conf 
spark.driver.userClassPathFirst=true --jars end_user_supply_jars spark-shell
If spark-shell (or the driver) run in the JVM in the above way, what is the 
point of configuration of "spark.driver.userClassPathFirst"? There is no way 
driver can control the classpath, allow end user supply jar through "--jars" to 
override any spark jars. 
Am I misunderstand the meaning of "spark.driver.userClassPathFirst", or it is 
not possible to overwrite any class in /opt/spark/lib and SPARK_CLASSPATH? Then 
what is the usage of "spark.driver.userClassPathFirst" for?
Yong
  

Spark Standalone with SPARK_CLASSPATH in spark-env.sh and "spark.driver.userClassPathFirst"

2016-04-15 Thread Yong Zhang
Hi, 
I found out one problem of using "spark.driver.userClassPathFirst" and 
SPARK_CLASSPATH in spark-env.sh on Standalone environment, and want to confirm 
this in fact has no good solution.
We are running Spark 1.5.2 in standalone mode on a cluster. Since the cluster 
doesn't have the direct internet access, so we always add additional common 
jars in the SPARK_CLASSPATH of spark-env.sh file, so they are available in the 
Spark client and executor by default, like "spark-avro, 
spark-cassandra-connector, commons-csv, spark-csv" etc.
We pick these jars carefully, so they all works fine with Spark 1.5.2 version, 
without any issue.
But the problem comes when any user want to use Spark shell with their own 
jars, and these jars contains compatible issue with above jars, we got a 
problem on the driver side.
The user will start the spark shell in the following way as I suggested:
/opt/spark/bin/spark-shell . --conf 
spark.driver.userClassPathFirst=true --conf 
spark.executor.userClassPathFirst=true
But this still will get "java.lang.NoSuchMethodError" error, which I understand 
the reason due to version mismatch.
What I don't understand is on the driver end, if I check the driver JVM command:
 ps -ef | grep spark
I can clearly see the processing of the spark shell as this way:
/opt/java8/bin/java -cp 
all_jars_specified_in_SPARK_CLASSPATH:all_jars_under_opt_spark_lib --conf 
spark.executor.userClassPathFirst=true --conf 
spark.driver.userClassPathFirst=true --jars end_user_supply_jars spark-shell
If spark-shell (or the driver) run in the JVM in the above way, what is the 
point of configuration of "spark.driver.userClassPathFirst"? There is no way 
driver can control the classpath, allow end user supply jar through "--jars" to 
override any spark jars. 
Am I misunderstand the meaning of "spark.driver.userClassPathFirst", or it is 
not possible to overwrite any class in /opt/spark/lib and SPARK_CLASSPATH? Then 
what is the usage of "spark.driver.userClassPathFirst" for?
Yong  

Re: Will nested field performance improve?

2016-04-15 Thread Michael Armbrust
>
> If we expect fields nested in structs to always be much slower than flat
> fields, then I would be keen to address that in our ETL pipeline with a
> flattening step. If it's a known issue that we expect will be fixed in
> upcoming releases, I'll hold off.
>

The difference might be even larger in Spark 2.0 (because we really
optimize the simple case).  However, I would expect this to go away when we
fully columnarize the execution engine.  That could take a while though.


Re: Spark sql not pushing down timestamp range queries

2016-04-15 Thread Kiran Chitturi
Mich,

I am curious as well on how Spark casts between different types of filters.

For example: the conversions happen implicitly for 'EqualTo' filter

scala> sqlContext.sql("SELECT * from events WHERE `registration` =
> '2015-05-28'").explain()
>
> 16/04/15 11:44:15 INFO ParseDriver: Parsing command: SELECT * from events
> WHERE `registration` = '2015-05-28'
>
> 16/04/15 11:44:15 INFO ParseDriver: Parse Completed
>
> 16/04/15 11:44:15 INFO SolrRelation: Constructed SolrQuery:
> q=*:*&rows=1000&collection=demo&fq=registration:"2015-05-28T07:00:00.000Z"
>
> == Physical Plan ==
>
> Filter (registration#17 = 14327964)
> +- Scan 
> com.lucidworks.spark.SolrRelation@483f70bf[method#0,author#1,location#2,song#3,timestamp#4,auth#5,page#6,userAgent#7,lastName#8,firstName#9,id#10,itemInSession#11L,artist#12,status#13L,sessionId#14L,length#15,_version_#16L,registration#17,userId#18L,title#19,gender#20,level#21]
> PushedFilters: [EqualTo(registration,2015-05-28 00:00:00.0)]


whereas for other filters 'GreaterThan', 'GreatherThanOrEqual', 'LessThan',
'LessThanOREqual', the values are casted to String.

scala> sqlContext.sql("SELECT * from events WHERE `registration` >=
> '2015-05-28 00.00.00'").explain()
> 16/04/15 11:45:13 INFO ParseDriver: Parsing command: SELECT * from events
> WHERE `registration` >= '2015-05-28 00.00.00'
> 16/04/15 11:45:13 INFO ParseDriver: Parse Completed
> == Physical Plan ==
> Filter (cast(registration#17 as string) >= 2015-05-28 00.00.00)
> +- Scan com.lucidworks.spark.SolrRelation@483f70bf
> [method#0,author#1,location#2,song#3,timestamp#4,auth#5,page#6,userAgent#7,lastName#8,firstName#9,id#10,itemInSession#11L,artist#12,status#13L,sessionId#14L,length#15,_version_#16L,registration#17,userId#18L,title#19,gender#20,level#21]



How does Hive work with date/timestamp queries ? Can you share any docs
regarding this ?

I have found this in Hive docs
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select#LanguageManualSelect-PartitionBasedQueries
.

SELECT page_views.*
> FROM page_views
> WHERE page_views.date >= '2008-03-01' AND page_views.date <= '2008-03-31'


In the above SQL query, is there any implicit casting to String or does
Hive try to convert it to a Date/Timestamp and then do the comparison ?

 Thanks,

On Fri, Apr 15, 2016 at 1:53 AM, Mich Talebzadeh 
wrote:

> Thanks Takeshi,
>
> I did check it. I believe you are referring to this statement
>
> "This is likely because we cast this expression weirdly to be compatible
> with Hive. Specifically I think this turns into, CAST(c_date AS STRING)
> >= "2016-01-01", and we don't push down casts down into data sources.
>
> The reason for casting this way is because users currently expect the
> following to work c_date >= "2016". "
>
> There are two issues here:
>
>1. The CAST expression is not pushed down
>2. I still don't trust the string comparison of dates. It may or may
>not work. I recall this as an issue in Hive
>
> '2012-11-23' is not a DATE; It is a string. In general from my
> experience one should not try to compare a DATE with a string. The results
> will depend on several factors, some related to the tool, some related to
> the session.
>
> For example the following will convert a date as string format into a DATE
> type in Hive
>
>
> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/'),'-MM-dd'))
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 15 April 2016 at 06:58, Takeshi Yamamuro  wrote:
>
>> Hi, Mich
>>
>> Did you check the URL Josh referred to?;
>> the cast for string comparisons is needed for accepting `c_date >=
>> "2016"`.
>>
>> // maropu
>>
>>
>> On Fri, Apr 15, 2016 at 10:30 AM, Hyukjin Kwon 
>> wrote:
>>
>>> Hi,
>>>
>>>
>>> String comparison itself is pushed down fine but the problem is to deal
>>> with Cast.
>>>
>>>
>>> It was pushed down before but is was reverted, (
>>> https://github.com/apache/spark/pull/8049).
>>>
>>> Several fixes were tried here,
>>> https://github.com/apache/spark/pull/11005 and etc. but there were no
>>> changes to make it.
>>>
>>>
>>> To cut it short, it is not being pushed down because it is unsafe to
>>> resolve cast (eg. long to integer)
>>>
>>> For an workaround,  the implementation of Solr data source should be
>>> changed to one with CatalystScan, which take all the filters.
>>>
>>> But CatalystScan is not designed to be binary compatible across
>>> releases, however it looks some think it is stable now, as mentioned here,
>>> https://github.com/apache/spark/pull/10750#issuecomment-175400704.
>>>
>>>
>>> Thanks!
>>>
>>>
>>> 2016-04-15 3:30 GMT+09:00 Mich Talebzadeh :
>>>
 Hi Josh,

 Can you please clarify whether date comparisons as two strings work at
 all?

 I was under the impr

Re: Logging in executors

2016-04-15 Thread Ted Yu
See this thread: http://search-hadoop.com/m/q3RTtsFrd61q291j1

On Fri, Apr 15, 2016 at 5:38 AM, Carlos Rojas Matas 
wrote:

> Hi guys,
>
> any clue on this? Clearly the
> spark.executor.extraJavaOpts=-Dlog4j.configuration is not working on the
> executors.
>
> Thanks,
> -carlos.
>
> On Wed, Apr 13, 2016 at 2:48 PM, Carlos Rojas Matas 
> wrote:
>
>> Hi Yong,
>>
>> thanks for your response. As I said in my first email, I've tried both
>> the reference to the classpath resource (env/dev/log4j-executor.properties)
>> as the file:// protocol. Also, the driver logging is working fine and I'm
>> using the same kind of reference.
>>
>> Below the content of my classpath:
>>
>> [image: Inline image 1]
>>
>> Plus this is the content of the exploded fat jar assembled with sbt
>> assembly plugin:
>>
>> [image: Inline image 2]
>>
>>
>> This folder is at the root level of the classpath.
>>
>> Thanks,
>> -carlos.
>>
>> On Wed, Apr 13, 2016 at 2:35 PM, Yong Zhang  wrote:
>>
>>> Is the env/dev/log4j-executor.properties file within your jar file? Is
>>> the path matching with what you specified as
>>> env/dev/log4j-executor.properties?
>>>
>>> If you read the log4j document here:
>>> https://logging.apache.org/log4j/1.2/manual.html
>>>
>>> When you specify the log4j.configuration=my_custom.properties, you have
>>> 2 option:
>>>
>>> 1) the my_custom.properties has to be in the jar (or in the classpath).
>>> In your case, since you specify the package path, you need to make sure
>>> they are matched in your jar file
>>> 2) use like log4j.configuration=file:///tmp/my_custom.properties. In
>>> this way, you need to make sure file my_custom.properties exists in /tmp
>>> folder on ALL of your worker nodes.
>>>
>>> Yong
>>>
>>> --
>>> Date: Wed, 13 Apr 2016 14:18:24 -0300
>>> Subject: Re: Logging in executors
>>> From: cma...@despegar.com
>>> To: yuzhih...@gmail.com
>>> CC: user@spark.apache.org
>>>
>>>
>>> Thanks for your response Ted. You're right, there was a typo. I changed
>>> it, now I'm executing:
>>>
>>> bin/spark-submit --master spark://localhost:7077 --conf
>>> "spark.driver.extraJavaOptions=-Dlog4j.configuration=env/dev/log4j-driver.properties"
>>> --conf
>>> "spark.executor.extraJavaOptions=-Dlog4j.configuration=env/dev/log4j-executor.properties"
>>> --class
>>>
>>> The content of this file is:
>>>
>>> # Set everything to be logged to the console
>>> log4j.rootCategory=INFO, FILE
>>> log4j.appender.console=org.apache.log4j.ConsoleAppender
>>> log4j.appender.console.target=System.err
>>> log4j.appender.console.layout=org.apache.log4j.PatternLayout
>>> log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
>>> %c{1}: %m%n
>>>
>>> log4j.appender.FILE=org.apache.log4j.RollingFileAppender
>>> log4j.appender.FILE.File=/tmp/executor.log
>>> log4j.appender.FILE.ImmediateFlush=true
>>> log4j.appender.FILE.Threshold=debug
>>> log4j.appender.FILE.Append=true
>>> log4j.appender.FILE.MaxFileSize=100MB
>>> log4j.appender.FILE.MaxBackupIndex=5
>>> log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
>>> log4j.appender.FILE.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
>>> %c{1}: %m%n
>>>
>>> # Settings to quiet third party logs that are too verbose
>>> log4j.logger.org.spark-project.jetty=WARN
>>>
>>> log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
>>> log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
>>> log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
>>> log4j.logger.org.apache.parquet=ERROR
>>> log4j.logger.parquet=ERROR
>>> log4j.logger.com.despegar.p13n=DEBUG
>>>
>>> # SPARK-9183: Settings to avoid annoying messages when looking up
>>> nonexistent UDFs in SparkSQL with Hive support
>>> log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
>>> log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
>>>
>>>
>>> Finally, the code on which I'm using logging in the executor is:
>>>
>>> def groupAndCount(keys: DStream[(String, List[String])])(handler: 
>>> ResultHandler) = {
>>>
>>>   val result = keys.reduceByKey((prior, current) => {
>>> (prior ::: current)
>>>   }).flatMap {
>>> case (date, keys) =>
>>>   val rs = keys.groupBy(x => x).map(
>>>   obs =>{
>>> val (d,t) = date.split("@") match {
>>>   case Array(d,t) => (d,t)
>>> }
>>> import org.apache.log4j.Logger
>>> import scala.collection.JavaConverters._
>>> val logger: Logger = Logger.getRootLogger
>>> logger.info(s"Metric retrieved $d")
>>> Metric("PV", d, obs._1, t, obs._2.size)
>>> }
>>>   )
>>>   rs
>>>   }
>>>
>>>   result.foreachRDD((rdd: RDD[Metric], time: Time) => {
>>> handler(rdd, time)
>>>   })
>>>
>>> }
>>>
>>>
>>> Originally the import and logger object was outside the map function.
>>> I'm also using the root logger just to see if it's working, but nothing
>>> gets logged. I've check

Re: How do i get a spark instance to use my log4j properties

2016-04-15 Thread Demi Ben-Ari
Hi Steve,
I wrote a blog post on the configuration of Spark that we've used,
including the log4j.properties:
http://progexc.blogspot.co.il/2014/12/spark-configuration-mess-solved.html
(What we did was to distribute the relevant *log4j.properties* file to all
of the slaves to the same location)

Hope this helps, Good luck!

Quoting from the text in the post:
---
The right way to pass the parameter is through the property:
“*spark.driver.extraJavaOptions*” and “*spark.executor.extraJavaOptions*”:
I’ve passed both the log4J configurations property and the parameter that I
needed for the configurations. (To the Driver I was able to pass only the
log4j configuration).
For example (was written in properties file passed in spark-submit with “
*—properties-file*”):
*“*
spark.driver.extraJavaOptions
–Dlog4j.configuration=file:///spark/conf/log4j.properties -
spark.executor.extraJavaOptions
–Dlog4j.configuration=file:///spark/conf/log4j.properties
-Dapplication.properties.file=hdfs:///some/path/on/hdfs/app.properties
spark.application.properties.file hdfs:///some/path/on/hdfs/app.properties
*“*

On Tue, Apr 12, 2016 at 7:51 PM, Steve Lewis  wrote:

> Ok I am stymied. I have tried everything I can think of to get spark to
> use my own version of
>
> log4j.properties
>
> In the launcher code - I launch a local instance from a Java application
>
> I say -Dlog4j.configuration=conf/log4j.properties
>
> where conf/log4j.properties is user.dir - no luck
>
> Spark always starts saying
>
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
>
> I have a directory conf with my log4j.properties there but it seems to be
> ignored
>
> I use maven and an VERY RELUCTANT to edit the spark jars
>
> I know this point has been discussed here before but I do not see a clean
> answer
>
>
>
>
>
>
>


-- 
Best regards,
Demi Ben-Ari 
Entrepreneur @ Stealth Mode Startup
Twitter: @demibenari 


Re: Spark sql not pushing down timestamp range queries

2016-04-15 Thread Kiran Chitturi
Thanks Hyukjin for the suggestion. I will take a look at implementing Solr
datasource with CatalystScan.


​


Re: JSON Usage

2016-04-15 Thread Benjamin Kim
Holden,

If I were to use DataSets, then I would essentially do this:

val receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl)
val messages = sqs.receiveMessage(receiveMessageRequest).getMessages()
for (message <- messages.asScala) {
val files = sqlContext.read.json(message.getBody())
}

Can I simply do files.toDS() or do I have to create a schema using a case class 
File and apply it as[File]? If I have to apply a schema, then how would I 
create it based on the JSON structure below, especially the nested elements.

Thanks,
Ben


> On Apr 14, 2016, at 3:46 PM, Holden Karau  wrote:
> 
> You could certainly use RDDs for that, you might also find using Dataset 
> selecting the fields you need to construct the URL to fetch and then using 
> the map function to be easier.
> 
> On Thu, Apr 14, 2016 at 12:01 PM, Benjamin Kim  > wrote:
> I was wonder what would be the best way to use JSON in Spark/Scala. I need to 
> lookup values of fields in a collection of records to form a URL and download 
> that file at that location. I was thinking an RDD would be perfect for this. 
> I just want to hear from others who might have more experience in this. Below 
> is the actual JSON structure that I am trying to use for the S3 bucket and 
> key values of each “record" within “Records".
> 
> {
>"Records":[
>   {
>  "eventVersion":"2.0",
>  "eventSource":"aws:s3",
>  "awsRegion":"us-east-1",
>  "eventTime":The time, in ISO-8601 format, for example, 
> 1970-01-01T00:00:00.000Z, when S3 finished processing the request,
>  "eventName":"event-type",
>  "userIdentity":{
> 
> "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
>  },
>  "requestParameters":{
> "sourceIPAddress":"ip-address-where-request-came-from"
>  },
>  "responseElements":{
> "x-amz-request-id":"Amazon S3 generated request ID",
> "x-amz-id-2":"Amazon S3 host that processed the request"
>  },
>  "s3":{
> "s3SchemaVersion":"1.0",
> "configurationId":"ID found in the bucket notification 
> configuration",
> "bucket":{
>"name":"bucket-name",
>"ownerIdentity":{
>   "principalId":"Amazon-customer-ID-of-the-bucket-owner"
>},
>"arn":"bucket-ARN"
> },
> "object":{
>"key":"object-key",
>"size":object-size,
>"eTag":"object eTag",
>"versionId":"object version if bucket is versioning-enabled, 
> otherwise null",
>"sequencer": "a string representation of a hexadecimal value 
> used to determine event sequence,
>only used with PUTs and DELETEs"
> }
>  }
>   },
>   {
>   // Additional events
>   }
>]
> }
> 
> Thanks
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau 


Re: How many disks for spark_local_dirs?

2016-04-15 Thread Mich Talebzadeh
Is that 32 CPUs or 32 cores?

So in this configuration assuming 32 cores you have I worker with how much
memory (deducting memory for OS etc) and 32 cores.

What is the ratio  of memory per core in this case?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 15 April 2016 at 16:15, luca_guerra  wrote:

> Hi,
> I'm looking for a solution to improve my Spark cluster performances, I have
> read from http://spark.apache.org/docs/latest/hardware-provisioning.html:
> "We recommend having 4-8 disks per node", I have tried both with one and
> two
> disks but I have seen that with 2 disks the execution time is doubled. Any
> explanations about this?
>
> This is my configuration:
> 1 machine with 140 GB RAM 2 disks and 32 CPU (I know that is an unusual
> configuration) and on this I have a standalone Spark cluster with 1 Worker.
>
> Thank you very much for the help.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-many-disks-for-spark-local-dirs-tp26790.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: EMR Spark log4j and metrics

2016-04-15 Thread Peter Halliday
I wonder if anyone can confirm is Spark on YARN the problem here?  Or is it
how AWS has put it together?  I'm wondering if Spark on YARN has problems
with configuration files for the workers and driver?


Peter Halliday

On Thu, Apr 14, 2016 at 1:09 PM, Peter Halliday  wrote:

> An update to this is that I can see the log4j.properties files and the
> metrics.properties files correctly on the master.  When I submit a Spark
> Step that runs Spark in deploy mode of cluster, I see the cluster files
> being zipped up and pushed via hdfs to the driver and workers.  However, I
> don't see evidence than the configuration files are read from or used after
> they pushed
>
> On Wed, Apr 13, 2016 at 11:22 AM, Peter Halliday 
> wrote:
>
>> I have an existing cluster that I stand up via Docker images and
>> CloudFormation Templates  on AWS.  We are moving to EMR and AWS Data
>> Pipeline process, and having problems with metrics and log4j.  We’ve sent a
>> JSON configuration for spark-log4j and spark-metrics.  The log4j file seems
>> to be basically working for the master.  However, the driver and executors
>> it isn’t working for.  I’m not sure why.  Also, the metrics aren’t working
>> anywhere. It’s using a cloud watch to log the metrics, and there’s no
>> CloudWatch Sink for Spark it seems on EMR, and so we created one that we
>> added to a jar than’s sent via —jars to spark-submit.
>>
>> Peter Halliday
>
>
>


Re: can spark-csv package accept strings instead of files?

2016-04-15 Thread Benjamin Kim
Hi Hyukjin,

I saw that. I don’t know how to use it. I’m still learning Scala on my own. Can 
you help me to start?

Thanks,
Ben

> On Apr 15, 2016, at 8:02 AM, Hyukjin Kwon  wrote:
> 
> I hope it was not too late :).
> 
> It is possible.
> 
> Please check csvRdd api here, 
> https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150
>  
> .
> 
> Thanks!
> 
> On 2 Apr 2016 2:47 a.m., "Benjamin Kim"  > wrote:
> Does anyone know if this is possible? I have an RDD loaded with rows of CSV 
> data strings. Each string representing the header row and multiple rows of 
> data along with delimiters. I would like to feed each thru a CSV parser to 
> convert the data into a dataframe and, ultimately, UPSERT a Hive/HBase table 
> with this data.
> 
> Please let me know if you have any ideas.
> 
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 



How many disks for spark_local_dirs?

2016-04-15 Thread luca_guerra
Hi,
I'm looking for a solution to improve my Spark cluster performances, I have
read from http://spark.apache.org/docs/latest/hardware-provisioning.html:
"We recommend having 4-8 disks per node", I have tried both with one and two
disks but I have seen that with 2 disks the execution time is doubled. Any
explanations about this?

This is my configuration:
1 machine with 140 GB RAM 2 disks and 32 CPU (I know that is an unusual
configuration) and on this I have a standalone Spark cluster with 1 Worker.

Thank you very much for the help.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-many-disks-for-spark-local-dirs-tp26790.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: In-Memory Only Spark Shuffle

2016-04-15 Thread Hyukjin Kwon
This reminds me of this Jira,
https://issues.apache.org/jira/browse/SPARK-3376 and this PR,
https://github.com/apache/spark/pull/5403.

AFAIK, it is not and won't be supported.
On 2 Apr 2016 4:13 a.m., "slavitch"  wrote:

> Hello;
>
> I’m working on spark with very large memory systems (2TB+) and notice that
> Spark spills to disk in shuffle.  Is there a way to force spark to stay
> exclusively in memory when doing shuffle operations?   The goal is to keep
> the shuffle data either in the heap or in off-heap memory (in 1.6.x) and
> never touch the IO subsystem.  I am willing to have the job fail if it runs
> out of RAM.
>
> spark.shuffle.spill true  is deprecated in 1.6 and does not work in
> Tungsten
> sort in 1.5.x
>
> "WARN UnsafeShuffleManager: spark.shuffle.spill was set to false, but this
> is ignored by the tungsten-sort shuffle manager; its optimized shuffles
> will
> continue to spill to disk when necessary.”
>
> If this is impossible via configuration changes what code changes would be
> needed to accomplish this?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/In-Memory-Only-Spark-Shuffle-tp26661.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: can spark-csv package accept strings instead of files?

2016-04-15 Thread Hyukjin Kwon
I hope it was not too late :).

It is possible.

Please check csvRdd api here,
https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150
.

Thanks!
On 2 Apr 2016 2:47 a.m., "Benjamin Kim"  wrote:

> Does anyone know if this is possible? I have an RDD loaded with rows of
> CSV data strings. Each string representing the header row and multiple rows
> of data along with delimiters. I would like to feed each thru a CSV parser
> to convert the data into a dataframe and, ultimately, UPSERT a Hive/HBase
> table with this data.
>
> Please let me know if you have any ideas.
>
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark RDD.take() generating duplicates for AvroData

2016-04-15 Thread Anoop Shiralige
Hi All,

I have some avro data, which I am reading in the following way.

Query :

> val data = sc.newAPIHadoopFile(file,
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]], classOf[NullWritable]).
map(_._1.datum)

But, when I try to print the data, it is generating duplicates.

> data.take(10).foreach(println)


One of the workaround I found was to RDD.repartition(10) the data, upon
which I get the samples without any duplicates. I have read this post :
http://stackoverflow.com/questions/35951616/repeat-duplicate-records-with-avro-and-spark.
But this is not solving my problem.  I am curious to know the reason for
this behaviour.

Thank you for your time,
AnoopShiralige


Re: How to stop hivecontext

2016-04-15 Thread Ted Yu
You can call stop() method. 

> On Apr 15, 2016, at 5:21 AM, ram kumar  wrote:
> 
> Hi,
> I started hivecontext as,
> 
> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
> 
> I want to stop this sql context
> 
> Thanks

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Logging in executors

2016-04-15 Thread Carlos Rojas Matas
Hi guys,

any clue on this? Clearly the
spark.executor.extraJavaOpts=-Dlog4j.configuration is not working on the
executors.

Thanks,
-carlos.

On Wed, Apr 13, 2016 at 2:48 PM, Carlos Rojas Matas 
wrote:

> Hi Yong,
>
> thanks for your response. As I said in my first email, I've tried both the
> reference to the classpath resource (env/dev/log4j-executor.properties) as
> the file:// protocol. Also, the driver logging is working fine and I'm
> using the same kind of reference.
>
> Below the content of my classpath:
>
> [image: Inline image 1]
>
> Plus this is the content of the exploded fat jar assembled with sbt
> assembly plugin:
>
> [image: Inline image 2]
>
>
> This folder is at the root level of the classpath.
>
> Thanks,
> -carlos.
>
> On Wed, Apr 13, 2016 at 2:35 PM, Yong Zhang  wrote:
>
>> Is the env/dev/log4j-executor.properties file within your jar file? Is
>> the path matching with what you specified as
>> env/dev/log4j-executor.properties?
>>
>> If you read the log4j document here:
>> https://logging.apache.org/log4j/1.2/manual.html
>>
>> When you specify the log4j.configuration=my_custom.properties, you have 2
>> option:
>>
>> 1) the my_custom.properties has to be in the jar (or in the classpath).
>> In your case, since you specify the package path, you need to make sure
>> they are matched in your jar file
>> 2) use like log4j.configuration=file:///tmp/my_custom.properties. In this
>> way, you need to make sure file my_custom.properties exists in /tmp folder
>> on ALL of your worker nodes.
>>
>> Yong
>>
>> --
>> Date: Wed, 13 Apr 2016 14:18:24 -0300
>> Subject: Re: Logging in executors
>> From: cma...@despegar.com
>> To: yuzhih...@gmail.com
>> CC: user@spark.apache.org
>>
>>
>> Thanks for your response Ted. You're right, there was a typo. I changed
>> it, now I'm executing:
>>
>> bin/spark-submit --master spark://localhost:7077 --conf
>> "spark.driver.extraJavaOptions=-Dlog4j.configuration=env/dev/log4j-driver.properties"
>> --conf
>> "spark.executor.extraJavaOptions=-Dlog4j.configuration=env/dev/log4j-executor.properties"
>> --class
>>
>> The content of this file is:
>>
>> # Set everything to be logged to the console
>> log4j.rootCategory=INFO, FILE
>> log4j.appender.console=org.apache.log4j.ConsoleAppender
>> log4j.appender.console.target=System.err
>> log4j.appender.console.layout=org.apache.log4j.PatternLayout
>> log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
>> %c{1}: %m%n
>>
>> log4j.appender.FILE=org.apache.log4j.RollingFileAppender
>> log4j.appender.FILE.File=/tmp/executor.log
>> log4j.appender.FILE.ImmediateFlush=true
>> log4j.appender.FILE.Threshold=debug
>> log4j.appender.FILE.Append=true
>> log4j.appender.FILE.MaxFileSize=100MB
>> log4j.appender.FILE.MaxBackupIndex=5
>> log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
>> log4j.appender.FILE.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
>> %c{1}: %m%n
>>
>> # Settings to quiet third party logs that are too verbose
>> log4j.logger.org.spark-project.jetty=WARN
>>
>> log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
>> log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
>> log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
>> log4j.logger.org.apache.parquet=ERROR
>> log4j.logger.parquet=ERROR
>> log4j.logger.com.despegar.p13n=DEBUG
>>
>> # SPARK-9183: Settings to avoid annoying messages when looking up
>> nonexistent UDFs in SparkSQL with Hive support
>> log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
>> log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
>>
>>
>> Finally, the code on which I'm using logging in the executor is:
>>
>> def groupAndCount(keys: DStream[(String, List[String])])(handler: 
>> ResultHandler) = {
>>
>>   val result = keys.reduceByKey((prior, current) => {
>> (prior ::: current)
>>   }).flatMap {
>> case (date, keys) =>
>>   val rs = keys.groupBy(x => x).map(
>>   obs =>{
>> val (d,t) = date.split("@") match {
>>   case Array(d,t) => (d,t)
>> }
>> import org.apache.log4j.Logger
>> import scala.collection.JavaConverters._
>> val logger: Logger = Logger.getRootLogger
>> logger.info(s"Metric retrieved $d")
>> Metric("PV", d, obs._1, t, obs._2.size)
>> }
>>   )
>>   rs
>>   }
>>
>>   result.foreachRDD((rdd: RDD[Metric], time: Time) => {
>> handler(rdd, time)
>>   })
>>
>> }
>>
>>
>> Originally the import and logger object was outside the map function. I'm
>> also using the root logger just to see if it's working, but nothing gets
>> logged. I've checked that the property is set correctly on the executor
>> side through println(System.getProperty("log4j.configuration")) and is OK,
>> but still not working.
>>
>> Thanks again,
>> -carlos.
>>
>
>


How to stop hivecontext

2016-04-15 Thread ram kumar
Hi,
I started hivecontext as,

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);

I want to stop this sql context

Thanks


Unable To access Hive From Spark

2016-04-15 Thread Amit Singh Hora
Hi All,

I am trying to access hive from Spark but getting exception
The root scratch dir: /tmp/hive on HDFS should be writable. Current
permissions are: rw-rw-rw-

Code :-

String logFile = "hdfs://hdp23ha/logs"; // Should be some file 
on

// your system]
System.setProperty("HADOOP_USER_NAME","hadoop");
SparkConf conf = new SparkConf().setAppName("Simple
Application").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.hadoopConfiguration().set("fs.defaultFS", 
"hdfs://hdp23ha");

sc.hadoopConfiguration().set("hive.metastore.warehouse.dir",
"/apps/hive/warehouse");

sc.hadoopConfiguration().set("hive.exec.local.scratchdir", "D://");
sc.hadoopConfiguration().set("dfs.nameservices", 
"hdp23ha");

sc.hadoopConfiguration().set("hive.exec.scratchdir", "/tmp/hive/");

sc.hadoopConfiguration().setInt("hive.exec.scratchdir.permission",
777);

sc.hadoopConfiguration().set("dfs.ha.namenodes.hdp23ha", "nn1,nn2");

sc.hadoopConfiguration().set("dfs.namenode.rpc-address.hdp23ha.nn1",
"ambarimaster:8020");

sc.hadoopConfiguration().set("dfs.namenode.rpc-address.hdp23ha.nn2",
"hdp231:8020");
   
sc.hadoopConfiguration().set("dfs.client.failover.proxy.provider.hdp23ha",
 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");


HiveContext hiveContext = new 
org.apache.spark.sql.hive.HiveContext(sc);
JavaRDD logData = sc.textFile(logFile).cache();

hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS Logs (date 
STRING,msg
STRING) STORED AS ORC");
 
JavaRDD logsRDD=logData.map(new Function() {

public Logs call(String arg0) throws Exception {
String array[]=arg0.split(",");
Logs logs=new Logs(array[0],array[1]);
return logs;
}
});
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame schemaPeople = sqlContext.createDataFrame(logsRDD, 
Logs.class);
schemaPeople.registerTempTable("logs");



 DataFrame results = sqlContext.sql("SELECT * FROM 
logs");
 


 results.write().format("orc").saveAsTable("Logs");



Any help would of a great help 
Thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-To-access-Hive-From-Spark-tp26788.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[Help]:Strange Issue :Debug Spark Dataframe code

2016-04-15 Thread Divya Gehlot
Hi,
I am using Spark 1.5.2 with Scala 2.10.
Is there any other option apart from "explain(true)" to debug Spark
Dataframe code .

I am facing strange issue .
I have a lookuo dataframe and using it join another  dataframe on different
columns .

I am getting *Analysis exception* in third join.
When I checked  the logical plan ,  its using the same reference for key
but while selecting the columns reference  are changing.
For example
df1 = COLUMN1#15,COLUMN2#16,COLUMN3#17

In first two joins
I am getting the same reference and joining is happening
For first two join  the column  COLUMN1#15  I am getting the COLUMN2#16 and
COLUMN3#17.

But at third join COLUMN1#15 is same but the other column reference are
updating as  COLUMN2#167,COLUMN3#168

Its throwing Spark Analysis Exception

> org.apache.spark.sql.AnalysisException: resolved attribute(s) COLUMN1#15
> missing from


after two joins,the  dataframe has more than 25 columns

Could anybody help light the path by holding the torch.
Would really appreciate the help.

Thanks,
Divya


Re: Spark sql not pushing down timestamp range queries

2016-04-15 Thread Mich Talebzadeh
Thanks Takeshi,

I did check it. I believe you are referring to this statement

"This is likely because we cast this expression weirdly to be compatible
with Hive. Specifically I think this turns into, CAST(c_date AS STRING) >=
"2016-01-01", and we don't push down casts down into data sources.

The reason for casting this way is because users currently expect the
following to work c_date >= "2016". "

There are two issues here:

   1. The CAST expression is not pushed down
   2. I still don't trust the string comparison of dates. It may or may not
   work. I recall this as an issue in Hive

'2012-11-23' is not a DATE; It is a string. In general from my
experience one should not try to compare a DATE with a string. The results
will depend on several factors, some related to the tool, some related to
the session.

For example the following will convert a date as string format into a DATE
type in Hive

TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/'),'-MM-dd'))

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 15 April 2016 at 06:58, Takeshi Yamamuro  wrote:

> Hi, Mich
>
> Did you check the URL Josh referred to?;
> the cast for string comparisons is needed for accepting `c_date >= "2016"`.
>
> // maropu
>
>
> On Fri, Apr 15, 2016 at 10:30 AM, Hyukjin Kwon 
> wrote:
>
>> Hi,
>>
>>
>> String comparison itself is pushed down fine but the problem is to deal
>> with Cast.
>>
>>
>> It was pushed down before but is was reverted, (
>> https://github.com/apache/spark/pull/8049).
>>
>> Several fixes were tried here, https://github.com/apache/spark/pull/11005
>> and etc. but there were no changes to make it.
>>
>>
>> To cut it short, it is not being pushed down because it is unsafe to
>> resolve cast (eg. long to integer)
>>
>> For an workaround,  the implementation of Solr data source should be
>> changed to one with CatalystScan, which take all the filters.
>>
>> But CatalystScan is not designed to be binary compatible across releases,
>> however it looks some think it is stable now, as mentioned here,
>> https://github.com/apache/spark/pull/10750#issuecomment-175400704.
>>
>>
>> Thanks!
>>
>>
>> 2016-04-15 3:30 GMT+09:00 Mich Talebzadeh :
>>
>>> Hi Josh,
>>>
>>> Can you please clarify whether date comparisons as two strings work at
>>> all?
>>>
>>> I was under the impression is that with string comparison only first
>>> characters are compared?
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 14 April 2016 at 19:26, Josh Rosen  wrote:
>>>
 AFAIK this is not being pushed down because it involves an implicit
 cast and we currently don't push casts into data sources or scans; see
 https://github.com/databricks/spark-redshift/issues/155 for a
 possibly-related discussion.

 On Thu, Apr 14, 2016 at 10:27 AM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Are you comparing strings in here or timestamp?
>
> Filter ((cast(registration#37 as string) >= 2015-05-28) &&
> (cast(registration#37 as string) <= 2015-05-29))
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 14 April 2016 at 18:04, Kiran Chitturi <
> kiran.chitt...@lucidworks.com> wrote:
>
>> Hi,
>>
>> Timestamp range filter queries in SQL are not getting pushed down to
>> the PrunedFilteredScan instances. The filtering is happening at the Spark
>> layer.
>>
>> The physical plan for timestamp range queries is not showing the
>> pushed filters where as range queries on other types is working fine as 
>> the
>> physical plan is showing the pushed filters.
>>
>> Please see below for code and examples.
>>
>> *Example:*
>>
>> *1.* Range filter queries on Timestamp types
>>
>>*code: *
>>
>>> sqlContext.sql("SELECT * from events WHERE `registration` >=
>>> '2015-05-28' AND `registration` <= '2015-05-29' ")
>>
>>*Full example*:
>> https://github.com/lucidworks/spark-solr/blob/master/src/test/scala/com/lucidworks/spark/EventsimTestSuite.scala#L151
>> *plan*:
>> https://gist.github.com/kiranchitturi/4a52688c9f0abe3d4b2bd8b938044421#file-time-range-sql
>>
>> *2. * Range filter queries on Long types
>>
>> *code*:
>>
>>

Will nested field performance improve?

2016-04-15 Thread James Aley
Hello,

I'm trying to make a call on whether my team should invest time added a
step to "flatten" our schema as part of our ETL pipeline to improve
performance of interactive queries.

Our data start out life as Avro before being converted to Parquet, and so
we follow the Avro idioms of creating our own types to reduce boilerplate
in many areas. For example, every record we define has a "metadata" struct
field with all the fields that are common to all records as part of the
system design. Those fields are very common, and so virtually all queries
need to access them. As a result, nearly all of our queries don't see the
best performance we could be seeing in Spark SQL, etc.

So my question - is this just inherently the way it is, or do we expect
future releases will put them on a par with flat fields? The reason I ask
is that I've actually seen similar differences in performance with Presto
too. In benchmarks for both Spark and Presto, I generally see queries
working on flat fields run 5-6x faster than queries doing the same thing on
a nested field.

If we expect fields nested in structs to always be much slower than flat
fields, then I would be keen to address that in our ETL pipeline with a
flattening step. If it's a known issue that we expect will be fixed in
upcoming releases, I'll hold off.

Any advice greatly appreciated!

Thanks,

James.